Merge pull request #36 from tomaka/datastore-arbitrary

Allow datastore to store values of arbitrary type
This commit is contained in:
Fredrik Harrysson 2017-11-16 10:37:35 +01:00 committed by GitHub
commit a62f9f801d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 458 additions and 459 deletions

View File

@ -7,5 +7,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
base64 = "0.7" base64 = "0.7"
futures = "0.1" futures = "0.1"
parking_lot = "0.4" parking_lot = "0.4"
serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
tempfile = "2.2" tempfile = "2.2"

View File

@ -1,264 +1,271 @@
// Copyright 2017 Parity Technologies (UK) Ltd. // Copyright 2017 Parity Technologies (UK) Ltd.
// //
// Permission is hereby granted, free of charge, to any person obtaining a // Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"), // copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation // to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense, // the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the // and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions: // Software is furnished to do so, subject to the following conditions:
// //
// The above copyright notice and this permission notice shall be included in // The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software. // all copies or substantial portions of the Software.
// //
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use Datastore; use Datastore;
use base64;
use futures::Future; use futures::Future;
use futures::stream::{Stream, iter_ok}; use futures::stream::{Stream, iter_ok};
use parking_lot::Mutex;
use query::{Query, naive_apply_query}; use query::{Query, naive_apply_query};
use serde_json::{from_reader, to_writer}; use serde::Serialize;
use serde_json::map::Map; use serde::de::DeserializeOwned;
use serde_json::{Map, from_value, to_value, from_reader, to_writer};
use serde_json::value::Value; use serde_json::value::Value;
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::HashMap;
use std::fs; use std::fs;
use std::io::Cursor; use std::io::Cursor;
use std::io::Error as IoError; use std::io::Error as IoError;
use std::io::ErrorKind as IoErrorKind; use std::io::ErrorKind as IoErrorKind;
use std::io::Read; use std::io::Read;
use std::path::PathBuf; use std::path::PathBuf;
use parking_lot::Mutex;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
/// Implementation of `Datastore` that uses a single plain JSON file. /// Implementation of `Datastore` that uses a single plain JSON file.
pub struct JsonFileDatastore { pub struct JsonFileDatastore<T>
path: PathBuf, where T: Serialize + DeserializeOwned
content: Mutex<Map<String, Value>>, {
path: PathBuf,
content: Mutex<HashMap<String, T>>,
} }
impl JsonFileDatastore { impl<T> JsonFileDatastore<T>
/// Opens or creates the datastore. If the path refers to an existing path, then this function where T: Serialize + DeserializeOwned
/// will attempt to load an existing set of values from it (which can result in an error). {
/// Otherwise if the path doesn't exist, a new empty datastore will be created. /// Opens or creates the datastore. If the path refers to an existing path, then this function
pub fn new<P>(path: P) -> Result<JsonFileDatastore, IoError> /// will attempt to load an existing set of values from it (which can result in an error).
where /// Otherwise if the path doesn't exist, a new empty datastore will be created.
P: Into<PathBuf>, pub fn new<P>(path: P) -> Result<JsonFileDatastore<T>, IoError>
{ where P: Into<PathBuf>
let path = path.into(); {
let path = path.into();
if !path.exists() { if !path.exists() {
return Ok(JsonFileDatastore { return Ok(JsonFileDatastore {
path: path, path: path,
content: Mutex::new(Map::new()), content: Mutex::new(HashMap::new()),
}); });
} }
let content = { let content = {
let mut file = fs::File::open(&path)?; let mut file = fs::File::open(&path)?;
// We want to support empty files (and treat them as an empty recordset). Unfortunately // We want to support empty files (and treat them as an empty recordset). Unfortunately
// `serde_json` will always produce an error if we do this ("unexpected EOF at line 0 // `serde_json` will always produce an error if we do this ("unexpected EOF at line 0
// column 0"). Therefore we start by reading one byte from the file in order to check // column 0"). Therefore we start by reading one byte from the file in order to check
// for EOF. // for EOF.
let mut first_byte = [0]; let mut first_byte = [0];
if file.read(&mut first_byte)? == 0 { if file.read(&mut first_byte)? == 0 {
// File is empty. // File is empty.
Map::new() HashMap::new()
} else { } else {
match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) {
Ok(Value::Null) => Map::new(), Ok(Value::Null) => HashMap::new(),
Ok(Value::Object(map)) => map, Ok(Value::Object(map)) => {
Ok(_) => { let mut out = HashMap::with_capacity(map.len());
return Err(IoError::new( for (key, value) in map.into_iter() {
IoErrorKind::InvalidData, let value = match from_value(value) {
"expected JSON object", Ok(v) => v,
)); Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)),
} };
Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), out.insert(key, value);
} }
} out
}; }
Ok(_) => {
return Err(IoError::new(IoErrorKind::InvalidData, "expected JSON object"));
}
Err(err) => {
return Err(IoError::new(IoErrorKind::InvalidData, err));
}
}
}
};
Ok(JsonFileDatastore { Ok(JsonFileDatastore {
path: path, path: path,
content: Mutex::new(content), content: Mutex::new(content),
}) })
} }
/// Flushes the content of the datastore to the disk. /// Flushes the content of the datastore to the disk.
/// ///
/// This function can only fail in case of a disk access error. If an error occurs, any change /// This function can only fail in case of a disk access error. If an error occurs, any change
/// to the datastore that was performed since the last successful flush will be lost. No data /// to the datastore that was performed since the last successful flush will be lost. No data
/// will be corrupted. /// will be corrupted.
pub fn flush(&self) -> Result<(), IoError> { pub fn flush(&self) -> Result<(), IoError> {
// Create a temporary file in the same directory as the destination, which avoids the // Create a temporary file in the same directory as the destination, which avoids the
// problem of having a file cleaner delete our file while we use it. // problem of having a file cleaner delete our file while we use it.
let self_path_parent = self.path.parent().ok_or(IoError::new( let self_path_parent = self.path
IoErrorKind::Other, .parent()
"couldn't get parent directory of destination", .ok_or(IoError::new(
))?; IoErrorKind::Other,
let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; "couldn't get parent directory of destination",
))?;
let mut temporary_file = NamedTempFile::new_in(self_path_parent)?;
let content = self.content.lock(); let content = self.content.lock();
to_writer(&mut temporary_file, &*content)?; to_writer(
temporary_file.sync_data()?; &mut temporary_file,
&content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::<Map<_, _>>(),
)?; // TODO: panic!
temporary_file.sync_data()?;
// Note that `persist` will fail if we try to persist across filesystems. However that // Note that `persist` will fail if we try to persist across filesystems. However that
// shouldn't happen since we created the temporary file in the same directory as the final // shouldn't happen since we created the temporary file in the same directory as the final
// path. // path.
temporary_file.persist(&self.path)?; temporary_file.persist(&self.path)?;
Ok(()) Ok(())
} }
} }
impl Datastore for JsonFileDatastore { impl<T> Datastore<T> for JsonFileDatastore<T>
fn put(&self, key: Cow<str>, value: Vec<u8>) { where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static
let mut content = self.content.lock(); {
content.insert(key.into_owned(), Value::String(base64::encode(&value))); #[inline]
} fn put(&self, key: Cow<str>, value: T) {
let mut content = self.content.lock();
content.insert(key.into_owned(), value);
}
fn get(&self, key: &str) -> Option<Vec<u8>> { fn get(&self, key: &str) -> Option<T> {
let content = self.content.lock(); let content = self.content.lock();
// If the JSON is malformed, we just ignore the value. // If the JSON is malformed, we just ignore the value.
content.get(key).and_then(|val| match val { content.get(key).cloned()
&Value::String(ref s) => base64::decode(s).ok(), }
_ => None,
})
}
fn has(&self, key: &str) -> bool { fn has(&self, key: &str) -> bool {
let content = self.content.lock(); let content = self.content.lock();
content.contains_key(key) content.contains_key(key)
} }
fn delete(&self, key: &str) -> bool { fn delete(&self, key: &str) -> bool {
let mut content = self.content.lock(); let mut content = self.content.lock();
content.remove(key).is_some() content.remove(key).is_some()
} }
fn query<'a>( fn query<'a>(
&'a self, &'a self,
query: Query, query: Query<T>,
) -> Box<Stream<Item = (String, Vec<u8>), Error = IoError> + 'a> { ) -> Box<Stream<Item = (String, T), Error = IoError> + 'a> {
let content = self.content.lock(); let content = self.content.lock();
let keys_only = query.keys_only; let keys_only = query.keys_only;
let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { let content_stream = iter_ok(content.iter().filter_map(|(key, value)| {
// Skip values that are malformed. // Skip values that are malformed.
let value = if keys_only { let value = if keys_only { Default::default() } else { value.clone() };
Vec::with_capacity(0)
} else {
match value {
&Value::String(ref s) => {
match base64::decode(s) {
Ok(s) => s,
Err(_) => return None,
}
}
_ => return None,
}
};
Some((key.clone(), value)) Some((key.clone(), value))
})); }));
// `content_stream` reads from the content of the `Mutex`, so we need to clone the data // `content_stream` reads from the content of the `Mutex`, so we need to clone the data
// into a `Vec` before returning. // into a `Vec` before returning.
let collected = naive_apply_query(content_stream, query) let collected = naive_apply_query(content_stream, query)
.collect() .collect()
.wait() .wait()
.expect("can only fail if either `naive_apply_query` or `content_stream` produce \ .expect("can only fail if either `naive_apply_query` or `content_stream` produce \
an error, which cann't happen"); an error, which cann't happen");
let output_stream = iter_ok(collected.into_iter()); let output_stream = iter_ok(collected.into_iter());
Box::new(output_stream) as Box<_> Box::new(output_stream) as Box<_>
} }
} }
impl Drop for JsonFileDatastore { impl<T> Drop for JsonFileDatastore<T>
#[inline] where T: Serialize + DeserializeOwned
fn drop(&mut self) { {
// Unfortunately there's not much we can do here in case of an error, as panicking would be #[inline]
// very bad. Similar to `File`, the user should take care to call `flush()` before dropping fn drop(&mut self) {
// the datastore. // Unfortunately there's not much we can do here in case of an error, as panicking would be
// // very bad. Similar to `File`, the user should take care to call `flush()` before dropping
// If an error happens here, any change since the last successful flush will be lost, but // the datastore.
// the data will not be corrupted. //
let _ = self.flush(); // If an error happens here, any change since the last successful flush will be lost, but
} // the data will not be corrupted.
let _ = self.flush();
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use {Query, Order, Filter, FilterTy, FilterOp}; use {Query, Order, Filter, FilterTy, FilterOp};
use Datastore; use Datastore;
use JsonFileDatastore; use JsonFileDatastore;
use futures::{Future, Stream}; use futures::{Future, Stream};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
#[test] #[test]
fn open_and_flush() { fn open_and_flush() {
let temp_file = NamedTempFile::new().unwrap(); let temp_file = NamedTempFile::new().unwrap();
let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); let datastore = JsonFileDatastore::<Vec<u8>>::new(temp_file.path()).unwrap();
datastore.flush().unwrap(); datastore.flush().unwrap();
} }
#[test] #[test]
fn values_store_and_reload() { fn values_store_and_reload() {
let temp_file = NamedTempFile::new().unwrap(); let temp_file = NamedTempFile::new().unwrap();
let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); let datastore = JsonFileDatastore::<Vec<u8>>::new(temp_file.path()).unwrap();
datastore.put("foo".into(), vec![1, 2, 3]); datastore.put("foo".into(), vec![1, 2, 3]);
datastore.put("bar".into(), vec![0, 255, 127]); datastore.put("bar".into(), vec![0, 255, 127]);
datastore.flush().unwrap(); datastore.flush().unwrap();
drop(datastore); drop(datastore);
let reload = JsonFileDatastore::new(temp_file.path()).unwrap(); let reload = JsonFileDatastore::<Vec<u8>>::new(temp_file.path()).unwrap();
assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]);
assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]);
} }
#[test] #[test]
fn query_basic() { fn query_basic() {
let temp_file = NamedTempFile::new().unwrap(); let temp_file = NamedTempFile::new().unwrap();
let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); let datastore = JsonFileDatastore::<Vec<u8>>::new(temp_file.path()).unwrap();
datastore.put("foo1".into(), vec![1, 2, 3]); datastore.put("foo1".into(), vec![6, 7, 8]);
datastore.put("foo2".into(), vec![4, 5, 6]); datastore.put("foo2".into(), vec![6, 7, 8]);
datastore.put("foo3".into(), vec![7, 8, 9]); datastore.put("foo3".into(), vec![7, 8, 9]);
datastore.put("foo4".into(), vec![10, 11, 12]); datastore.put("foo4".into(), vec![10, 11, 12]);
datastore.put("foo5".into(), vec![13, 14, 15]); datastore.put("foo5".into(), vec![13, 14, 15]);
datastore.put("bar1".into(), vec![0, 255, 127]); datastore.put("bar1".into(), vec![0, 255, 127]);
datastore.flush().unwrap(); datastore.flush().unwrap();
let query = datastore let query = datastore.query(Query {
.query(Query { prefix: "fo".into(),
prefix: "fo".into(), filters: vec![
filters: vec![ Filter {
Filter { ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()),
ty: FilterTy::ValueCompare(vec![6, 7, 8].into()), operation: FilterOp::NotEqual,
operation: FilterOp::Greater, },
}, ],
], orders: vec![Order::ByKeyDesc],
orders: vec![Order::ByKeyDesc], skip: 1,
skip: 1, limit: u64::max_value(),
limit: u64::max_value(), keys_only: false,
keys_only: false, })
}) .collect()
.collect() .wait()
.wait() .unwrap();
.unwrap();
assert_eq!(query[0].0, "foo4"); assert_eq!(query[0].0, "foo4");
assert_eq!(query[0].1, &[10, 11, 12]); assert_eq!(query[0].1, &[10, 11, 12]);
assert_eq!(query[1].0, "foo3"); assert_eq!(query[1].0, "foo3");
assert_eq!(query[1].1, &[7, 8, 9]); assert_eq!(query[1].1, &[7, 8, 9]);
} }
} }

View File

@ -1,27 +1,28 @@
// Copyright 2017 Parity Technologies (UK) Ltd. // Copyright 2017 Parity Technologies (UK) Ltd.
// //
// Permission is hereby granted, free of charge, to any person obtaining a // Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"), // copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation // to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense, // the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the // and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions: // Software is furnished to do so, subject to the following conditions:
// //
// The above copyright notice and this permission notice shall be included in // The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software. // all copies or substantial portions of the Software.
// //
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
extern crate base64; extern crate base64;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
extern crate parking_lot; extern crate parking_lot;
extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate tempfile; extern crate tempfile;
@ -36,26 +37,26 @@ pub use self::json_file::JsonFileDatastore;
pub use self::query::{Query, Order, Filter, FilterTy, FilterOp}; pub use self::query::{Query, Order, Filter, FilterTy, FilterOp};
/// Abstraction over any struct that can store `(key, value)` pairs. /// Abstraction over any struct that can store `(key, value)` pairs.
pub trait Datastore { pub trait Datastore<T> {
/// Sets the value of a key. /// Sets the value of a key.
fn put(&self, key: Cow<str>, value: Vec<u8>); fn put(&self, key: Cow<str>, value: T);
/// Returns the value corresponding to this key. /// Returns the value corresponding to this key.
// TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data
fn get(&self, key: &str) -> Option<Vec<u8>>; fn get(&self, key: &str) -> Option<T>;
/// Returns true if the datastore contains the given key. /// Returns true if the datastore contains the given key.
fn has(&self, key: &str) -> bool; fn has(&self, key: &str) -> bool;
/// Removes the given key from the datastore. Returns true if the key existed. /// Removes the given key from the datastore. Returns true if the key existed.
fn delete(&self, key: &str) -> bool; fn delete(&self, key: &str) -> bool;
/// Executes a query on the key-value store. /// Executes a query on the key-value store.
/// ///
/// This operation is expensive on some implementations and cheap on others. It is your /// This operation is expensive on some implementations and cheap on others. It is your
/// responsibility to pick the right implementation for the right job. /// responsibility to pick the right implementation for the right job.
fn query<'a>( fn query<'a>(
&'a self, &'a self,
query: Query, query: Query<T>,
) -> Box<Stream<Item = (String, Vec<u8>), Error = IoError> + 'a>; ) -> Box<Stream<Item = (String, T), Error = IoError> + 'a>;
} }

View File

@ -1,21 +1,21 @@
// Copyright 2017 Parity Technologies (UK) Ltd. // Copyright 2017 Parity Technologies (UK) Ltd.
// //
// Permission is hereby granted, free of charge, to any person obtaining a // Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"), // copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation // to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense, // the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the // and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions: // Software is furnished to do so, subject to the following conditions:
// //
// The above copyright notice and this permission notice shall be included in // The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software. // all copies or substantial portions of the Software.
// //
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::{Stream, Future, Async, Poll}; use futures::{Stream, Future, Async, Poll};
@ -31,300 +31,290 @@ use std::vec::IntoIter as VecIntoIter;
/// The various modifications of the dataset are applied in the same order as the fields (prefix, /// The various modifications of the dataset are applied in the same order as the fields (prefix,
/// filters, orders, skip, limit). /// filters, orders, skip, limit).
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Query<'a> { pub struct Query<'a, T: 'a> {
/// Only the keys that start with `prefix` will be returned. /// Only the keys that start with `prefix` will be returned.
pub prefix: Cow<'a, str>, pub prefix: Cow<'a, str>,
/// Filters to apply on the results. /// Filters to apply on the results.
pub filters: Vec<Filter<'a>>, pub filters: Vec<Filter<'a, T>>,
/// How to order the keys. Applied sequentially. /// How to order the keys. Applied sequentially.
pub orders: Vec<Order>, pub orders: Vec<Order>,
/// Number of elements to skip from at the start of the results. /// Number of elements to skip from at the start of the results.
pub skip: u64, pub skip: u64,
/// Maximum number of elements in the results. /// Maximum number of elements in the results.
pub limit: u64, pub limit: u64,
/// Only return keys. If true, then all the `Vec`s of the data will be empty. /// Only return keys. If true, then all the `Vec`s of the data will be empty.
pub keys_only: bool, pub keys_only: bool,
} }
/// A filter to apply to the results set. /// A filter to apply to the results set.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Filter<'a> { pub struct Filter<'a, T: 'a> {
/// Type of filter and value to compare with. /// Type of filter and value to compare with.
pub ty: FilterTy<'a>, pub ty: FilterTy<'a, T>,
/// Comparison operation. /// Comparison operation.
pub operation: FilterOp, pub operation: FilterOp,
} }
/// Type of filter and value to compare with. /// Type of filter and value to compare with.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum FilterTy<'a> { pub enum FilterTy<'a, T: 'a> {
/// Compare the key with a reference value. /// Compare the key with a reference value.
KeyCompare(Cow<'a, str>), KeyCompare(Cow<'a, str>),
/// Compare the value with a reference value. /// Compare the value with a reference value.
ValueCompare(Cow<'a, [u8]>), ValueCompare(&'a T),
} }
/// Filtering operation. Keep in mind that anything else than `Equal` and `NotEqual` is a bit /// Filtering operation.
/// blurry.
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum FilterOp { pub enum FilterOp {
Less, Equal,
LessOrEqual, NotEqual,
Equal, Less,
NotEqual, LessOrEqual,
Greater, Greater,
GreaterOrEqual, GreaterOrEqual,
} }
/// Order in which to sort the results of a query. /// Order in which to sort the results of a query.
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum Order { pub enum Order {
/// Put the values in ascending order. /// Put the values in ascending order.
ByValueAsc, ByValueAsc,
/// Put the values in descending order. /// Put the values in descending order.
ByValueDesc, ByValueDesc,
/// Put the keys in ascending order. /// Put the keys in ascending order.
ByKeyAsc, ByKeyAsc,
/// Put the keys in descending order. /// Put the keys in descending order.
ByKeyDesc, ByKeyDesc,
} }
/// Naively applies a query on a set of results. /// Naively applies a query on a set of results.
pub fn naive_apply_query<'a, S>(stream: S, query: Query<'a>) pub fn naive_apply_query<'a, S, V>(stream: S, query: Query<'a, V>)
-> StreamTake<StreamSkip<NaiveKeysOnlyApply<NaiveApplyOrdered<NaiveFiltersApply<'a, NaivePrefixApply<'a, S>, VecIntoIter<Filter<'a>>>>>>> -> StreamTake<StreamSkip<NaiveKeysOnlyApply<NaiveApplyOrdered<NaiveFiltersApply<'a, NaivePrefixApply<'a, S>, VecIntoIter<Filter<'a, V>>>, V>>>>
where S: Stream<Item = (String, Vec<u8>), Error = IoError> + 'a where S: Stream<Item = (String, V), Error = IoError> + 'a,
V: Clone + Ord + Default + 'static
{ {
let prefixed = naive_apply_prefix(stream, query.prefix); let prefixed = naive_apply_prefix(stream, query.prefix);
let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); let filtered = naive_apply_filters(prefixed, query.filters.into_iter());
let ordered = naive_apply_ordered(filtered, query.orders); let ordered = naive_apply_ordered(filtered, query.orders);
let keys_only = naive_apply_keys_only(ordered, query.keys_only); let keys_only = naive_apply_keys_only(ordered, query.keys_only);
naive_apply_skip_limit(keys_only, query.skip, query.limit) naive_apply_skip_limit(keys_only, query.skip, query.limit)
} }
/// Skips the `skip` first element of a stream and only returns `limit` elements. /// Skips the `skip` first element of a stream and only returns `limit` elements.
#[inline] #[inline]
pub fn naive_apply_skip_limit<S>(stream: S, skip: u64, limit: u64) -> StreamTake<StreamSkip<S>> pub fn naive_apply_skip_limit<S, T>(stream: S, skip: u64, limit: u64) -> StreamTake<StreamSkip<S>>
where where S: Stream<Item = (String, T), Error = IoError>
S: Stream<Item = (String, Vec<u8>), Error = IoError>,
{ {
stream.skip(skip).take(limit) stream.skip(skip).take(limit)
} }
/// Filters the result of a stream to empty values if `keys_only` is true. /// Filters the result of a stream to empty values if `keys_only` is true.
#[inline] #[inline]
pub fn naive_apply_keys_only<S>(stream: S, keys_only: bool) -> NaiveKeysOnlyApply<S> pub fn naive_apply_keys_only<S, T>(stream: S, keys_only: bool) -> NaiveKeysOnlyApply<S>
where where S: Stream<Item = (String, T), Error = IoError>
S: Stream<Item = (String, Vec<u8>), Error = IoError>,
{ {
NaiveKeysOnlyApply { NaiveKeysOnlyApply {
keys_only: keys_only, keys_only: keys_only,
stream: stream, stream: stream,
} }
} }
/// Returned by `naive_apply_keys_only`. /// Returned by `naive_apply_keys_only`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NaiveKeysOnlyApply<S> { pub struct NaiveKeysOnlyApply<S> {
keys_only: bool, keys_only: bool,
stream: S, stream: S,
} }
impl<S> Stream for NaiveKeysOnlyApply<S> impl<S, T> Stream for NaiveKeysOnlyApply<S>
where where S: Stream<Item = (String, T), Error = IoError>,
S: Stream<Item = (String, Vec<u8>), Error = IoError>, T: Default
{ {
type Item = (String, Vec<u8>); type Item = (String, T);
type Error = IoError; type Error = IoError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.keys_only { if self.keys_only {
Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| {
v.1 = Vec::new(); v.1 = Default::default();
v v
}))) })))
} else { } else {
self.stream.poll() self.stream.poll()
} }
} }
} }
/// Filters the result of a stream to only keep the results with a prefix. /// Filters the result of a stream to only keep the results with a prefix.
#[inline] #[inline]
pub fn naive_apply_prefix<'a, S>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> pub fn naive_apply_prefix<'a, S, T>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S>
where where S: Stream<Item = (String, T), Error = IoError>
S: Stream<Item = (String, Vec<u8>), Error = IoError>,
{ {
NaivePrefixApply { NaivePrefixApply { prefix: prefix, stream: stream }
prefix: prefix,
stream: stream,
}
} }
/// Returned by `naive_apply_prefix`. /// Returned by `naive_apply_prefix`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NaivePrefixApply<'a, S> { pub struct NaivePrefixApply<'a, S> {
prefix: Cow<'a, str>, prefix: Cow<'a, str>,
stream: S, stream: S,
} }
impl<'a, S> Stream for NaivePrefixApply<'a, S> impl<'a, S, T> Stream for NaivePrefixApply<'a, S>
where where S: Stream<Item = (String, T), Error = IoError>
S: Stream<Item = (String, Vec<u8>), Error = IoError>,
{ {
type Item = (String, Vec<u8>); type Item = (String, T);
type Error = IoError; type Error = IoError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop { loop {
let item = try_ready!(self.stream.poll()); let item = try_ready!(self.stream.poll());
match item { match item {
Some(i) => { Some(i) => {
if i.0.starts_with(&*self.prefix) { if i.0.starts_with(&*self.prefix) {
return Ok(Async::Ready(Some(i))); return Ok(Async::Ready(Some(i)));
} }
} }
None => return Ok(Async::Ready(None)), None => return Ok(Async::Ready(None)),
} }
} }
} }
} }
/// Applies orderings on the stream data. Will simply pass data through if the list of orderings /// Applies orderings on the stream data. Will simply pass data through if the list of orderings
/// is empty. Otherwise will need to collect. /// is empty. Otherwise will need to collect.
pub fn naive_apply_ordered<'a, S, I>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S> pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V>
where where S: Stream<Item = (String, V), Error = IoError> + 'a,
S: Stream<Item = (String, Vec<u8>), Error = IoError> + 'a, I: IntoIterator<Item = Order>,
I: IntoIterator<Item = Order>, I::IntoIter: 'a,
I::IntoIter: 'a, V: Ord + 'static
{ {
let orders_iter = orders_iter.into_iter(); let orders_iter = orders_iter.into_iter();
if orders_iter.size_hint().1 == Some(0) { if orders_iter.size_hint().1 == Some(0) {
return NaiveApplyOrdered { inner: NaiveApplyOrderedInner::PassThrough(stream) }; return NaiveApplyOrdered { inner: NaiveApplyOrderedInner::PassThrough(stream) };
} }
let collected = stream let collected = stream.collect()
.collect() .and_then(move |mut collected| {
.and_then(move |mut collected| { for order in orders_iter {
for order in orders_iter { match order {
match order { Order::ByValueAsc => {
Order::ByValueAsc => { collected.sort_by(|a, b| a.1.cmp(&b.1));
collected.sort_by(|a, b| a.1.cmp(&b.1)); }
} Order::ByValueDesc => {
Order::ByValueDesc => { collected.sort_by(|a, b| b.1.cmp(&a.1));
collected.sort_by(|a, b| b.1.cmp(&a.1)); }
} Order::ByKeyAsc => {
Order::ByKeyAsc => { collected.sort_by(|a, b| a.0.cmp(&b.0));
collected.sort_by(|a, b| a.0.cmp(&b.0)); }
} Order::ByKeyDesc => {
Order::ByKeyDesc => { collected.sort_by(|a, b| b.0.cmp(&a.0));
collected.sort_by(|a, b| b.0.cmp(&a.0)); }
} }
} }
} Ok(iter_ok(collected.into_iter()))
Ok(iter_ok(collected.into_iter())) })
}) .flatten_stream();
.flatten_stream();
NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) } NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) }
} }
/// Returned by `naive_apply_ordered`. /// Returned by `naive_apply_ordered`.
pub struct NaiveApplyOrdered<'a, S> { pub struct NaiveApplyOrdered<'a, S, T> {
inner: NaiveApplyOrderedInner<'a, S>, inner: NaiveApplyOrderedInner<'a, S, T>,
} }
enum NaiveApplyOrderedInner<'a, S> { enum NaiveApplyOrderedInner<'a, S, T> {
PassThrough(S), PassThrough(S),
Collected(Box<Stream<Item = (String, Vec<u8>), Error = IoError> + 'a>), Collected(Box<Stream<Item = (String, T), Error = IoError> + 'a>),
} }
impl<'a, S> Stream for NaiveApplyOrdered<'a, S> impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V>
where where S: Stream<Item = (String, V), Error = IoError>
S: Stream<Item = (String, Vec<u8>), Error = IoError>,
{ {
type Item = (String, Vec<u8>); type Item = (String, V);
type Error = IoError; type Error = IoError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner { match self.inner {
NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(), NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(),
NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(), NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(),
} }
} }
} }
/// Filters the result of a stream to apply a set of filters. /// Filters the result of a stream to apply a set of filters.
#[inline] #[inline]
pub fn naive_apply_filters<'a, S, I>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I>
where where S: Stream<Item = (String, V), Error = IoError>,
S: Stream<Item = (String, Vec<u8>), Error = IoError>, I: Iterator<Item = Filter<'a, V>> + Clone,
I: Iterator<Item = Filter<'a>> + Clone, V: 'a
{ {
NaiveFiltersApply { NaiveFiltersApply {
filters: filters, filters: filters,
stream: stream, stream: stream,
marker: PhantomData, marker: PhantomData,
} }
} }
/// Returned by `naive_apply_prefix`. /// Returned by `naive_apply_prefix`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NaiveFiltersApply<'a, S, I> { pub struct NaiveFiltersApply<'a, S, I> {
filters: I, filters: I,
stream: S, stream: S,
marker: PhantomData<&'a ()>, marker: PhantomData<&'a ()>,
} }
impl<'a, S, I> Stream for NaiveFiltersApply<'a, S, I> impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I>
where where S: Stream<Item = (String, T), Error = IoError>,
S: Stream< I: Iterator<Item = Filter<'a, T>> + Clone,
Item = (String, Vec<u8>), T: Ord + 'a
Error = IoError,
>,
I: Iterator<Item = Filter<'a>> + Clone,
{ {
type Item = (String, Vec<u8>); type Item = (String, T);
type Error = IoError; type Error = IoError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
'outer: loop { 'outer: loop {
let item = try_ready!(self.stream.poll()); let item = try_ready!(self.stream.poll());
match item { match item {
Some(i) => { Some(i) => {
for filter in self.filters.clone() { for filter in self.filters.clone() {
if !naive_filter_test(&i, &filter) { if !naive_filter_test(&i, &filter) {
continue 'outer; continue 'outer;
} }
} }
return Ok(Async::Ready(Some(i))); return Ok(Async::Ready(Some(i)));
} }
None => return Ok(Async::Ready(None)), None => return Ok(Async::Ready(None)),
} }
} }
} }
} }
#[inline] #[inline]
fn naive_filter_test(entry: &(String, Vec<u8>), filter: &Filter) -> bool { fn naive_filter_test<T>(entry: &(String, T), filter: &Filter<T>) -> bool
let (expected_ordering, revert_expected) = match filter.operation { where T: Ord
FilterOp::Less => (Ordering::Less, false), {
FilterOp::LessOrEqual => (Ordering::Greater, true), let (expected_ordering, revert_expected) = match filter.operation {
FilterOp::Equal => (Ordering::Less, false), FilterOp::Equal => (Ordering::Equal, false),
FilterOp::NotEqual => (Ordering::Less, true), FilterOp::NotEqual => (Ordering::Equal, true),
FilterOp::Greater => (Ordering::Greater, false), FilterOp::Less => (Ordering::Less, false),
FilterOp::GreaterOrEqual => (Ordering::Less, true), FilterOp::GreaterOrEqual => (Ordering::Less, true),
}; FilterOp::Greater => (Ordering::Greater, false),
FilterOp::LessOrEqual => (Ordering::Greater, true),
};
match filter.ty { match filter.ty {
FilterTy::KeyCompare(ref ref_value) => { FilterTy::KeyCompare(ref ref_value) => {
((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected
} }
FilterTy::ValueCompare(ref ref_value) => { FilterTy::ValueCompare(ref ref_value) => {
((&*entry.1).cmp(&**ref_value) == expected_ordering) != revert_expected (entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected
} }
} }
} }