From b82f56ac3e3b922db038342c147cfbfa4f77ecdc Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Nov 2017 17:25:06 +0100 Subject: [PATCH 1/2] Allow datastore to store values of arbitrary type --- datastore/Cargo.toml | 1 + datastore/src/json_file.rs | 99 ++++++++++++++++++++--------------- datastore/src/lib.rs | 11 ++-- datastore/src/query.rs | 102 ++++++++++++++++++++----------------- 4 files changed, 118 insertions(+), 95 deletions(-) diff --git a/datastore/Cargo.toml b/datastore/Cargo.toml index a65c2f8c..e2e7e1f7 100644 --- a/datastore/Cargo.toml +++ b/datastore/Cargo.toml @@ -7,5 +7,6 @@ authors = ["Parity Technologies "] base64 = "0.7" futures = "0.1" parking_lot = "0.4" +serde = "1.0" serde_json = "1.0" tempfile = "2.2" diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index a6c1b2c5..1a2087e5 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -19,14 +19,15 @@ // DEALINGS IN THE SOFTWARE. use Datastore; -use base64; use futures::Future; use futures::stream::{Stream, iter_ok}; use query::{Query, naive_apply_query}; -use serde_json::{from_reader, to_writer}; -use serde_json::map::Map; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::{Map, from_value, to_value, from_reader, to_writer}; use serde_json::value::Value; use std::borrow::Cow; +use std::collections::HashMap; use std::fs; use std::io::Cursor; use std::io::Error as IoError; @@ -37,16 +38,20 @@ use parking_lot::Mutex; use tempfile::NamedTempFile; /// Implementation of `Datastore` that uses a single plain JSON file. -pub struct JsonFileDatastore { +pub struct JsonFileDatastore + where T: Serialize + DeserializeOwned +{ path: PathBuf, - content: Mutex>, + content: Mutex>, } -impl JsonFileDatastore { +impl JsonFileDatastore + where T: Serialize + DeserializeOwned +{ /// Opens or creates the datastore. If the path refers to an existing path, then this function /// will attempt to load an existing set of values from it (which can result in an error). /// Otherwise if the path doesn't exist, a new empty datastore will be created. - pub fn new

(path: P) -> Result + pub fn new

(path: P) -> Result, IoError> where P: Into, { @@ -55,7 +60,7 @@ impl JsonFileDatastore { if !path.exists() { return Ok(JsonFileDatastore { path: path, - content: Mutex::new(Map::new()), + content: Mutex::new(HashMap::new()), }); } @@ -70,18 +75,33 @@ impl JsonFileDatastore { let mut first_byte = [0]; if file.read(&mut first_byte)? == 0 { // File is empty. - Map::new() + HashMap::new() } else { match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { - Ok(Value::Null) => Map::new(), - Ok(Value::Object(map)) => map, + Ok(Value::Null) => HashMap::new(), + Ok(Value::Object(map)) => { + let mut out = HashMap::with_capacity(map.len()); + for (key, value) in map.into_iter() { + let value = match from_value(value) { + Ok(v) => v, + Err(err) => return Err(IoError::new( + IoErrorKind::InvalidData, + err, + )), + }; + out.insert(key, value); + } + out + }, Ok(_) => { return Err(IoError::new( IoErrorKind::InvalidData, "expected JSON object", )); } - Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), + Err(err) => { + return Err(IoError::new(IoErrorKind::InvalidData, err)); + }, } } }; @@ -107,7 +127,8 @@ impl JsonFileDatastore { let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; let content = self.content.lock(); - to_writer(&mut temporary_file, &*content)?; + to_writer(&mut temporary_file, + &content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::>())?; // TODO: panic! temporary_file.sync_data()?; // Note that `persist` will fail if we try to persist across filesystems. However that @@ -118,19 +139,19 @@ impl JsonFileDatastore { } } -impl Datastore for JsonFileDatastore { - fn put(&self, key: Cow, value: Vec) { +impl Datastore for JsonFileDatastore + where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static +{ + #[inline] + fn put(&self, key: Cow, value: T) { let mut content = self.content.lock(); - content.insert(key.into_owned(), Value::String(base64::encode(&value))); + content.insert(key.into_owned(), value); } - fn get(&self, key: &str) -> Option> { + fn get(&self, key: &str) -> Option { let content = self.content.lock(); // If the JSON is malformed, we just ignore the value. - content.get(key).and_then(|val| match val { - &Value::String(ref s) => base64::decode(s).ok(), - _ => None, - }) + content.get(key).cloned() } fn has(&self, key: &str) -> bool { @@ -145,8 +166,8 @@ impl Datastore for JsonFileDatastore { fn query<'a>( &'a self, - query: Query, - ) -> Box), Error = IoError> + 'a> { + query: Query, + ) -> Box + 'a> { let content = self.content.lock(); let keys_only = query.keys_only; @@ -154,17 +175,9 @@ impl Datastore for JsonFileDatastore { let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { // Skip values that are malformed. let value = if keys_only { - Vec::with_capacity(0) + Default::default() } else { - match value { - &Value::String(ref s) => { - match base64::decode(s) { - Ok(s) => s, - Err(_) => return None, - } - } - _ => return None, - } + value.clone() }; Some((key.clone(), value)) @@ -182,7 +195,9 @@ impl Datastore for JsonFileDatastore { } } -impl Drop for JsonFileDatastore { +impl Drop for JsonFileDatastore + where T: Serialize + DeserializeOwned +{ #[inline] fn drop(&mut self) { // Unfortunately there's not much we can do here in case of an error, as panicking would be @@ -206,7 +221,7 @@ mod tests { #[test] fn open_and_flush() { let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); datastore.flush().unwrap(); } @@ -214,13 +229,13 @@ mod tests { fn values_store_and_reload() { let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); datastore.put("foo".into(), vec![1, 2, 3]); datastore.put("bar".into(), vec![0, 255, 127]); datastore.flush().unwrap(); drop(datastore); - let reload = JsonFileDatastore::new(temp_file.path()).unwrap(); + let reload = JsonFileDatastore::>::new(temp_file.path()).unwrap(); assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); } @@ -229,9 +244,9 @@ mod tests { fn query_basic() { let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); - datastore.put("foo1".into(), vec![1, 2, 3]); - datastore.put("foo2".into(), vec![4, 5, 6]); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); + datastore.put("foo1".into(), vec![6, 7, 8]); + datastore.put("foo2".into(), vec![6, 7, 8]); datastore.put("foo3".into(), vec![7, 8, 9]); datastore.put("foo4".into(), vec![10, 11, 12]); datastore.put("foo5".into(), vec![13, 14, 15]); @@ -243,8 +258,8 @@ mod tests { prefix: "fo".into(), filters: vec![ Filter { - ty: FilterTy::ValueCompare(vec![6, 7, 8].into()), - operation: FilterOp::Greater, + ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()), + operation: FilterOp::NotEqual, }, ], orders: vec![Order::ByKeyDesc], diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index 227c78fd..85281013 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -22,6 +22,7 @@ extern crate base64; #[macro_use] extern crate futures; extern crate parking_lot; +extern crate serde; extern crate serde_json; extern crate tempfile; @@ -36,13 +37,13 @@ pub use self::json_file::JsonFileDatastore; pub use self::query::{Query, Order, Filter, FilterTy, FilterOp}; /// Abstraction over any struct that can store `(key, value)` pairs. -pub trait Datastore { +pub trait Datastore { /// Sets the value of a key. - fn put(&self, key: Cow, value: Vec); + fn put(&self, key: Cow, value: T); /// Returns the value corresponding to this key. // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data - fn get(&self, key: &str) -> Option>; + fn get(&self, key: &str) -> Option; /// Returns true if the datastore contains the given key. fn has(&self, key: &str) -> bool; @@ -56,6 +57,6 @@ pub trait Datastore { /// responsibility to pick the right implementation for the right job. fn query<'a>( &'a self, - query: Query, - ) -> Box), Error = IoError> + 'a>; + query: Query, + ) -> Box + 'a>; } diff --git a/datastore/src/query.rs b/datastore/src/query.rs index 2f57dc32..7af87752 100644 --- a/datastore/src/query.rs +++ b/datastore/src/query.rs @@ -31,11 +31,11 @@ use std::vec::IntoIter as VecIntoIter; /// The various modifications of the dataset are applied in the same order as the fields (prefix, /// filters, orders, skip, limit). #[derive(Debug, Clone)] -pub struct Query<'a> { +pub struct Query<'a, T: 'a> { /// Only the keys that start with `prefix` will be returned. pub prefix: Cow<'a, str>, /// Filters to apply on the results. - pub filters: Vec>, + pub filters: Vec>, /// How to order the keys. Applied sequentially. pub orders: Vec, /// Number of elements to skip from at the start of the results. @@ -48,30 +48,29 @@ pub struct Query<'a> { /// A filter to apply to the results set. #[derive(Debug, Clone)] -pub struct Filter<'a> { +pub struct Filter<'a, T: 'a> { /// Type of filter and value to compare with. - pub ty: FilterTy<'a>, + pub ty: FilterTy<'a, T>, /// Comparison operation. pub operation: FilterOp, } /// Type of filter and value to compare with. #[derive(Debug, Clone)] -pub enum FilterTy<'a> { +pub enum FilterTy<'a, T: 'a> { /// Compare the key with a reference value. KeyCompare(Cow<'a, str>), /// Compare the value with a reference value. - ValueCompare(Cow<'a, [u8]>), + ValueCompare(&'a T), } -/// Filtering operation. Keep in mind that anything else than `Equal` and `NotEqual` is a bit -/// blurry. +/// Filtering operation. #[derive(Debug, Copy, Clone)] pub enum FilterOp { - Less, - LessOrEqual, Equal, NotEqual, + Less, + LessOrEqual, Greater, GreaterOrEqual, } @@ -90,9 +89,10 @@ pub enum Order { } /// Naively applies a query on a set of results. -pub fn naive_apply_query<'a, S>(stream: S, query: Query<'a>) - -> StreamTake, VecIntoIter>>>>>> - where S: Stream), Error = IoError> + 'a +pub fn naive_apply_query<'a, S, V>(stream: S, query: Query<'a, V>) + -> StreamTake, VecIntoIter>>, V>>>> + where S: Stream + 'a, + V: Clone + Ord + Default + 'static { let prefixed = naive_apply_prefix(stream, query.prefix); let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); @@ -103,18 +103,18 @@ pub fn naive_apply_query<'a, S>(stream: S, query: Query<'a>) /// Skips the `skip` first element of a stream and only returns `limit` elements. #[inline] -pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> +pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> where - S: Stream), Error = IoError>, + S: Stream, { stream.skip(skip).take(limit) } /// Filters the result of a stream to empty values if `keys_only` is true. #[inline] -pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply +pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply where - S: Stream), Error = IoError>, + S: Stream, { NaiveKeysOnlyApply { keys_only: keys_only, @@ -129,18 +129,19 @@ pub struct NaiveKeysOnlyApply { stream: S, } -impl Stream for NaiveKeysOnlyApply +impl Stream for NaiveKeysOnlyApply where - S: Stream), Error = IoError>, + S: Stream, + T: Default { - type Item = (String, Vec); + type Item = (String, T); type Error = IoError; #[inline] fn poll(&mut self) -> Poll, Self::Error> { if self.keys_only { Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { - v.1 = Vec::new(); + v.1 = Default::default(); v }))) } else { @@ -151,9 +152,9 @@ where /// Filters the result of a stream to only keep the results with a prefix. #[inline] -pub fn naive_apply_prefix<'a, S>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> +pub fn naive_apply_prefix<'a, S, T>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> where - S: Stream), Error = IoError>, + S: Stream, { NaivePrefixApply { prefix: prefix, @@ -168,11 +169,11 @@ pub struct NaivePrefixApply<'a, S> { stream: S, } -impl<'a, S> Stream for NaivePrefixApply<'a, S> +impl<'a, S, T> Stream for NaivePrefixApply<'a, S> where - S: Stream), Error = IoError>, + S: Stream, { - type Item = (String, Vec); + type Item = (String, T); type Error = IoError; #[inline] @@ -193,11 +194,12 @@ where /// Applies orderings on the stream data. Will simply pass data through if the list of orderings /// is empty. Otherwise will need to collect. -pub fn naive_apply_ordered<'a, S, I>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S> +pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V> where - S: Stream), Error = IoError> + 'a, + S: Stream + 'a, I: IntoIterator, I::IntoIter: 'a, + V: Ord + 'static, { let orders_iter = orders_iter.into_iter(); if orders_iter.size_hint().1 == Some(0) { @@ -231,20 +233,20 @@ where } /// Returned by `naive_apply_ordered`. -pub struct NaiveApplyOrdered<'a, S> { - inner: NaiveApplyOrderedInner<'a, S>, +pub struct NaiveApplyOrdered<'a, S, T> { + inner: NaiveApplyOrderedInner<'a, S, T>, } -enum NaiveApplyOrderedInner<'a, S> { +enum NaiveApplyOrderedInner<'a, S, T> { PassThrough(S), - Collected(Box), Error = IoError> + 'a>), + Collected(Box + 'a>), } -impl<'a, S> Stream for NaiveApplyOrdered<'a, S> +impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V> where - S: Stream), Error = IoError>, + S: Stream, { - type Item = (String, Vec); + type Item = (String, V); type Error = IoError; #[inline] @@ -258,10 +260,11 @@ where /// Filters the result of a stream to apply a set of filters. #[inline] -pub fn naive_apply_filters<'a, S, I>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> +pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> where - S: Stream), Error = IoError>, - I: Iterator> + Clone, + S: Stream, + I: Iterator> + Clone, + V: 'a { NaiveFiltersApply { filters: filters, @@ -278,15 +281,16 @@ pub struct NaiveFiltersApply<'a, S, I> { marker: PhantomData<&'a ()>, } -impl<'a, S, I> Stream for NaiveFiltersApply<'a, S, I> +impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I> where S: Stream< - Item = (String, Vec), + Item = (String, T), Error = IoError, >, - I: Iterator> + Clone, + I: Iterator> + Clone, + T: Ord + 'a, { - type Item = (String, Vec); + type Item = (String, T); type Error = IoError; #[inline] @@ -309,14 +313,16 @@ where } #[inline] -fn naive_filter_test(entry: &(String, Vec), filter: &Filter) -> bool { +fn naive_filter_test(entry: &(String, T), filter: &Filter) -> bool + where T: Ord +{ let (expected_ordering, revert_expected) = match filter.operation { + FilterOp::Equal => (Ordering::Equal, false), + FilterOp::NotEqual => (Ordering::Equal, true), FilterOp::Less => (Ordering::Less, false), - FilterOp::LessOrEqual => (Ordering::Greater, true), - FilterOp::Equal => (Ordering::Less, false), - FilterOp::NotEqual => (Ordering::Less, true), - FilterOp::Greater => (Ordering::Greater, false), FilterOp::GreaterOrEqual => (Ordering::Less, true), + FilterOp::Greater => (Ordering::Greater, false), + FilterOp::LessOrEqual => (Ordering::Greater, true), }; match filter.ty { @@ -324,7 +330,7 @@ fn naive_filter_test(entry: &(String, Vec), filter: &Filter) -> bool { ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected } FilterTy::ValueCompare(ref ref_value) => { - ((&*entry.1).cmp(&**ref_value) == expected_ordering) != revert_expected + (entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected } } } From 6c39bb5f42a801c393fa81e78fe95c42a93be8f5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Nov 2017 17:27:57 +0100 Subject: [PATCH 2/2] Run rustfmt on the code --- datastore/src/json_file.rs | 412 ++++++++++++++++++------------------- datastore/src/lib.rs | 60 +++--- datastore/src/query.rs | 402 +++++++++++++++++------------------- 3 files changed, 425 insertions(+), 449 deletions(-) diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index 1a2087e5..da847e6e 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -1,26 +1,27 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in +// The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use Datastore; use futures::Future; use futures::stream::{Stream, iter_ok}; +use parking_lot::Mutex; use query::{Query, naive_apply_query}; use serde::Serialize; use serde::de::DeserializeOwned; @@ -34,246 +35,237 @@ use std::io::Error as IoError; use std::io::ErrorKind as IoErrorKind; use std::io::Read; use std::path::PathBuf; -use parking_lot::Mutex; use tempfile::NamedTempFile; /// Implementation of `Datastore` that uses a single plain JSON file. pub struct JsonFileDatastore - where T: Serialize + DeserializeOwned + where T: Serialize + DeserializeOwned { - path: PathBuf, - content: Mutex>, + path: PathBuf, + content: Mutex>, } impl JsonFileDatastore - where T: Serialize + DeserializeOwned + where T: Serialize + DeserializeOwned { - /// Opens or creates the datastore. If the path refers to an existing path, then this function - /// will attempt to load an existing set of values from it (which can result in an error). - /// Otherwise if the path doesn't exist, a new empty datastore will be created. - pub fn new

(path: P) -> Result, IoError> - where - P: Into, - { - let path = path.into(); + /// Opens or creates the datastore. If the path refers to an existing path, then this function + /// will attempt to load an existing set of values from it (which can result in an error). + /// Otherwise if the path doesn't exist, a new empty datastore will be created. + pub fn new

(path: P) -> Result, IoError> + where P: Into + { + let path = path.into(); - if !path.exists() { - return Ok(JsonFileDatastore { - path: path, - content: Mutex::new(HashMap::new()), - }); - } + if !path.exists() { + return Ok(JsonFileDatastore { + path: path, + content: Mutex::new(HashMap::new()), + }); + } - let content = { - let mut file = fs::File::open(&path)?; + let content = { + let mut file = fs::File::open(&path)?; - // We want to support empty files (and treat them as an empty recordset). Unfortunately - // `serde_json` will always produce an error if we do this ("unexpected EOF at line 0 - // column 0"). Therefore we start by reading one byte from the file in order to check - // for EOF. + // We want to support empty files (and treat them as an empty recordset). Unfortunately + // `serde_json` will always produce an error if we do this ("unexpected EOF at line 0 + // column 0"). Therefore we start by reading one byte from the file in order to check + // for EOF. - let mut first_byte = [0]; - if file.read(&mut first_byte)? == 0 { - // File is empty. - HashMap::new() - } else { - match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { - Ok(Value::Null) => HashMap::new(), - Ok(Value::Object(map)) => { - let mut out = HashMap::with_capacity(map.len()); - for (key, value) in map.into_iter() { - let value = match from_value(value) { - Ok(v) => v, - Err(err) => return Err(IoError::new( - IoErrorKind::InvalidData, - err, - )), - }; - out.insert(key, value); - } - out - }, - Ok(_) => { - return Err(IoError::new( - IoErrorKind::InvalidData, - "expected JSON object", - )); - } - Err(err) => { - return Err(IoError::new(IoErrorKind::InvalidData, err)); - }, - } - } - }; + let mut first_byte = [0]; + if file.read(&mut first_byte)? == 0 { + // File is empty. + HashMap::new() + } else { + match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { + Ok(Value::Null) => HashMap::new(), + Ok(Value::Object(map)) => { + let mut out = HashMap::with_capacity(map.len()); + for (key, value) in map.into_iter() { + let value = match from_value(value) { + Ok(v) => v, + Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), + }; + out.insert(key, value); + } + out + } + Ok(_) => { + return Err(IoError::new(IoErrorKind::InvalidData, "expected JSON object")); + } + Err(err) => { + return Err(IoError::new(IoErrorKind::InvalidData, err)); + } + } + } + }; - Ok(JsonFileDatastore { - path: path, - content: Mutex::new(content), - }) - } + Ok(JsonFileDatastore { + path: path, + content: Mutex::new(content), + }) + } - /// Flushes the content of the datastore to the disk. - /// - /// This function can only fail in case of a disk access error. If an error occurs, any change - /// to the datastore that was performed since the last successful flush will be lost. No data - /// will be corrupted. - pub fn flush(&self) -> Result<(), IoError> { - // Create a temporary file in the same directory as the destination, which avoids the - // problem of having a file cleaner delete our file while we use it. - let self_path_parent = self.path.parent().ok_or(IoError::new( - IoErrorKind::Other, - "couldn't get parent directory of destination", - ))?; - let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; + /// Flushes the content of the datastore to the disk. + /// + /// This function can only fail in case of a disk access error. If an error occurs, any change + /// to the datastore that was performed since the last successful flush will be lost. No data + /// will be corrupted. + pub fn flush(&self) -> Result<(), IoError> { + // Create a temporary file in the same directory as the destination, which avoids the + // problem of having a file cleaner delete our file while we use it. + let self_path_parent = self.path + .parent() + .ok_or(IoError::new( + IoErrorKind::Other, + "couldn't get parent directory of destination", + ))?; + let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; - let content = self.content.lock(); - to_writer(&mut temporary_file, - &content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::>())?; // TODO: panic! - temporary_file.sync_data()?; + let content = self.content.lock(); + to_writer( + &mut temporary_file, + &content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::>(), + )?; // TODO: panic! + temporary_file.sync_data()?; - // Note that `persist` will fail if we try to persist across filesystems. However that - // shouldn't happen since we created the temporary file in the same directory as the final - // path. - temporary_file.persist(&self.path)?; - Ok(()) - } + // Note that `persist` will fail if we try to persist across filesystems. However that + // shouldn't happen since we created the temporary file in the same directory as the final + // path. + temporary_file.persist(&self.path)?; + Ok(()) + } } impl Datastore for JsonFileDatastore - where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static + where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static { - #[inline] - fn put(&self, key: Cow, value: T) { - let mut content = self.content.lock(); - content.insert(key.into_owned(), value); - } + #[inline] + fn put(&self, key: Cow, value: T) { + let mut content = self.content.lock(); + content.insert(key.into_owned(), value); + } - fn get(&self, key: &str) -> Option { - let content = self.content.lock(); - // If the JSON is malformed, we just ignore the value. - content.get(key).cloned() - } + fn get(&self, key: &str) -> Option { + let content = self.content.lock(); + // If the JSON is malformed, we just ignore the value. + content.get(key).cloned() + } - fn has(&self, key: &str) -> bool { - let content = self.content.lock(); - content.contains_key(key) - } + fn has(&self, key: &str) -> bool { + let content = self.content.lock(); + content.contains_key(key) + } - fn delete(&self, key: &str) -> bool { - let mut content = self.content.lock(); - content.remove(key).is_some() - } + fn delete(&self, key: &str) -> bool { + let mut content = self.content.lock(); + content.remove(key).is_some() + } - fn query<'a>( - &'a self, - query: Query, - ) -> Box + 'a> { - let content = self.content.lock(); + fn query<'a>( + &'a self, + query: Query, + ) -> Box + 'a> { + let content = self.content.lock(); - let keys_only = query.keys_only; + let keys_only = query.keys_only; - let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { - // Skip values that are malformed. - let value = if keys_only { - Default::default() - } else { - value.clone() - }; + let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { + // Skip values that are malformed. + let value = if keys_only { Default::default() } else { value.clone() }; - Some((key.clone(), value)) - })); + Some((key.clone(), value)) + })); - // `content_stream` reads from the content of the `Mutex`, so we need to clone the data - // into a `Vec` before returning. - let collected = naive_apply_query(content_stream, query) - .collect() - .wait() - .expect("can only fail if either `naive_apply_query` or `content_stream` produce \ - an error, which cann't happen"); - let output_stream = iter_ok(collected.into_iter()); - Box::new(output_stream) as Box<_> - } + // `content_stream` reads from the content of the `Mutex`, so we need to clone the data + // into a `Vec` before returning. + let collected = naive_apply_query(content_stream, query) + .collect() + .wait() + .expect("can only fail if either `naive_apply_query` or `content_stream` produce \ + an error, which cann't happen"); + let output_stream = iter_ok(collected.into_iter()); + Box::new(output_stream) as Box<_> + } } impl Drop for JsonFileDatastore - where T: Serialize + DeserializeOwned + where T: Serialize + DeserializeOwned { - #[inline] - fn drop(&mut self) { - // Unfortunately there's not much we can do here in case of an error, as panicking would be - // very bad. Similar to `File`, the user should take care to call `flush()` before dropping - // the datastore. - // - // If an error happens here, any change since the last successful flush will be lost, but - // the data will not be corrupted. - let _ = self.flush(); - } + #[inline] + fn drop(&mut self) { + // Unfortunately there's not much we can do here in case of an error, as panicking would be + // very bad. Similar to `File`, the user should take care to call `flush()` before dropping + // the datastore. + // + // If an error happens here, any change since the last successful flush will be lost, but + // the data will not be corrupted. + let _ = self.flush(); + } } #[cfg(test)] mod tests { - use {Query, Order, Filter, FilterTy, FilterOp}; - use Datastore; - use JsonFileDatastore; - use futures::{Future, Stream}; - use tempfile::NamedTempFile; + use {Query, Order, Filter, FilterTy, FilterOp}; + use Datastore; + use JsonFileDatastore; + use futures::{Future, Stream}; + use tempfile::NamedTempFile; - #[test] - fn open_and_flush() { - let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); - datastore.flush().unwrap(); - } + #[test] + fn open_and_flush() { + let temp_file = NamedTempFile::new().unwrap(); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); + datastore.flush().unwrap(); + } - #[test] - fn values_store_and_reload() { - let temp_file = NamedTempFile::new().unwrap(); + #[test] + fn values_store_and_reload() { + let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); - datastore.put("foo".into(), vec![1, 2, 3]); - datastore.put("bar".into(), vec![0, 255, 127]); - datastore.flush().unwrap(); - drop(datastore); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); + datastore.put("foo".into(), vec![1, 2, 3]); + datastore.put("bar".into(), vec![0, 255, 127]); + datastore.flush().unwrap(); + drop(datastore); - let reload = JsonFileDatastore::>::new(temp_file.path()).unwrap(); - assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); - assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); - } + let reload = JsonFileDatastore::>::new(temp_file.path()).unwrap(); + assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); + assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); + } - #[test] - fn query_basic() { - let temp_file = NamedTempFile::new().unwrap(); + #[test] + fn query_basic() { + let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); - datastore.put("foo1".into(), vec![6, 7, 8]); - datastore.put("foo2".into(), vec![6, 7, 8]); - datastore.put("foo3".into(), vec![7, 8, 9]); - datastore.put("foo4".into(), vec![10, 11, 12]); - datastore.put("foo5".into(), vec![13, 14, 15]); - datastore.put("bar1".into(), vec![0, 255, 127]); - datastore.flush().unwrap(); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); + datastore.put("foo1".into(), vec![6, 7, 8]); + datastore.put("foo2".into(), vec![6, 7, 8]); + datastore.put("foo3".into(), vec![7, 8, 9]); + datastore.put("foo4".into(), vec![10, 11, 12]); + datastore.put("foo5".into(), vec![13, 14, 15]); + datastore.put("bar1".into(), vec![0, 255, 127]); + datastore.flush().unwrap(); - let query = datastore - .query(Query { - prefix: "fo".into(), - filters: vec![ - Filter { - ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()), - operation: FilterOp::NotEqual, - }, - ], - orders: vec![Order::ByKeyDesc], - skip: 1, - limit: u64::max_value(), - keys_only: false, - }) - .collect() - .wait() - .unwrap(); + let query = datastore.query(Query { + prefix: "fo".into(), + filters: vec![ + Filter { + ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()), + operation: FilterOp::NotEqual, + }, + ], + orders: vec![Order::ByKeyDesc], + skip: 1, + limit: u64::max_value(), + keys_only: false, + }) + .collect() + .wait() + .unwrap(); - assert_eq!(query[0].0, "foo4"); - assert_eq!(query[0].1, &[10, 11, 12]); - assert_eq!(query[1].0, "foo3"); - assert_eq!(query[1].1, &[7, 8, 9]); - } + assert_eq!(query[0].0, "foo4"); + assert_eq!(query[0].1, &[10, 11, 12]); + assert_eq!(query[1].0, "foo3"); + assert_eq!(query[1].1, &[7, 8, 9]); + } } diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index 85281013..57fb50e7 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -1,21 +1,21 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in +// The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. extern crate base64; @@ -38,25 +38,25 @@ pub use self::query::{Query, Order, Filter, FilterTy, FilterOp}; /// Abstraction over any struct that can store `(key, value)` pairs. pub trait Datastore { - /// Sets the value of a key. - fn put(&self, key: Cow, value: T); + /// Sets the value of a key. + fn put(&self, key: Cow, value: T); - /// Returns the value corresponding to this key. - // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data - fn get(&self, key: &str) -> Option; + /// Returns the value corresponding to this key. + // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data + fn get(&self, key: &str) -> Option; - /// Returns true if the datastore contains the given key. - fn has(&self, key: &str) -> bool; + /// Returns true if the datastore contains the given key. + fn has(&self, key: &str) -> bool; - /// Removes the given key from the datastore. Returns true if the key existed. - fn delete(&self, key: &str) -> bool; + /// Removes the given key from the datastore. Returns true if the key existed. + fn delete(&self, key: &str) -> bool; - /// Executes a query on the key-value store. - /// - /// This operation is expensive on some implementations and cheap on others. It is your - /// responsibility to pick the right implementation for the right job. - fn query<'a>( - &'a self, - query: Query, - ) -> Box + 'a>; + /// Executes a query on the key-value store. + /// + /// This operation is expensive on some implementations and cheap on others. It is your + /// responsibility to pick the right implementation for the right job. + fn query<'a>( + &'a self, + query: Query, + ) -> Box + 'a>; } diff --git a/datastore/src/query.rs b/datastore/src/query.rs index 7af87752..a83c353b 100644 --- a/datastore/src/query.rs +++ b/datastore/src/query.rs @@ -1,21 +1,21 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in +// The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use futures::{Stream, Future, Async, Poll}; @@ -32,305 +32,289 @@ use std::vec::IntoIter as VecIntoIter; /// filters, orders, skip, limit). #[derive(Debug, Clone)] pub struct Query<'a, T: 'a> { - /// Only the keys that start with `prefix` will be returned. - pub prefix: Cow<'a, str>, - /// Filters to apply on the results. - pub filters: Vec>, - /// How to order the keys. Applied sequentially. - pub orders: Vec, - /// Number of elements to skip from at the start of the results. - pub skip: u64, - /// Maximum number of elements in the results. - pub limit: u64, - /// Only return keys. If true, then all the `Vec`s of the data will be empty. - pub keys_only: bool, + /// Only the keys that start with `prefix` will be returned. + pub prefix: Cow<'a, str>, + /// Filters to apply on the results. + pub filters: Vec>, + /// How to order the keys. Applied sequentially. + pub orders: Vec, + /// Number of elements to skip from at the start of the results. + pub skip: u64, + /// Maximum number of elements in the results. + pub limit: u64, + /// Only return keys. If true, then all the `Vec`s of the data will be empty. + pub keys_only: bool, } /// A filter to apply to the results set. #[derive(Debug, Clone)] pub struct Filter<'a, T: 'a> { - /// Type of filter and value to compare with. - pub ty: FilterTy<'a, T>, - /// Comparison operation. - pub operation: FilterOp, + /// Type of filter and value to compare with. + pub ty: FilterTy<'a, T>, + /// Comparison operation. + pub operation: FilterOp, } /// Type of filter and value to compare with. #[derive(Debug, Clone)] pub enum FilterTy<'a, T: 'a> { - /// Compare the key with a reference value. - KeyCompare(Cow<'a, str>), - /// Compare the value with a reference value. - ValueCompare(&'a T), + /// Compare the key with a reference value. + KeyCompare(Cow<'a, str>), + /// Compare the value with a reference value. + ValueCompare(&'a T), } /// Filtering operation. #[derive(Debug, Copy, Clone)] pub enum FilterOp { - Equal, - NotEqual, - Less, - LessOrEqual, - Greater, - GreaterOrEqual, + Equal, + NotEqual, + Less, + LessOrEqual, + Greater, + GreaterOrEqual, } /// Order in which to sort the results of a query. #[derive(Debug, Copy, Clone)] pub enum Order { - /// Put the values in ascending order. - ByValueAsc, - /// Put the values in descending order. - ByValueDesc, - /// Put the keys in ascending order. - ByKeyAsc, - /// Put the keys in descending order. - ByKeyDesc, + /// Put the values in ascending order. + ByValueAsc, + /// Put the values in descending order. + ByValueDesc, + /// Put the keys in ascending order. + ByKeyAsc, + /// Put the keys in descending order. + ByKeyDesc, } /// Naively applies a query on a set of results. pub fn naive_apply_query<'a, S, V>(stream: S, query: Query<'a, V>) - -> StreamTake, VecIntoIter>>, V>>>> - where S: Stream + 'a, - V: Clone + Ord + Default + 'static + -> StreamTake, VecIntoIter>>, V>>>> + where S: Stream + 'a, + V: Clone + Ord + Default + 'static { - let prefixed = naive_apply_prefix(stream, query.prefix); - let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); - let ordered = naive_apply_ordered(filtered, query.orders); - let keys_only = naive_apply_keys_only(ordered, query.keys_only); - naive_apply_skip_limit(keys_only, query.skip, query.limit) + let prefixed = naive_apply_prefix(stream, query.prefix); + let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); + let ordered = naive_apply_ordered(filtered, query.orders); + let keys_only = naive_apply_keys_only(ordered, query.keys_only); + naive_apply_skip_limit(keys_only, query.skip, query.limit) } /// Skips the `skip` first element of a stream and only returns `limit` elements. #[inline] pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> -where - S: Stream, + where S: Stream { - stream.skip(skip).take(limit) + stream.skip(skip).take(limit) } /// Filters the result of a stream to empty values if `keys_only` is true. #[inline] pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply -where - S: Stream, + where S: Stream { - NaiveKeysOnlyApply { - keys_only: keys_only, - stream: stream, - } + NaiveKeysOnlyApply { + keys_only: keys_only, + stream: stream, + } } /// Returned by `naive_apply_keys_only`. #[derive(Debug, Clone)] pub struct NaiveKeysOnlyApply { - keys_only: bool, - stream: S, + keys_only: bool, + stream: S, } impl Stream for NaiveKeysOnlyApply -where - S: Stream, - T: Default + where S: Stream, + T: Default { - type Item = (String, T); - type Error = IoError; + type Item = (String, T); + type Error = IoError; - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - if self.keys_only { - Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { - v.1 = Default::default(); - v - }))) - } else { - self.stream.poll() - } - } + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + if self.keys_only { + Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { + v.1 = Default::default(); + v + }))) + } else { + self.stream.poll() + } + } } /// Filters the result of a stream to only keep the results with a prefix. #[inline] pub fn naive_apply_prefix<'a, S, T>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> -where - S: Stream, + where S: Stream { - NaivePrefixApply { - prefix: prefix, - stream: stream, - } + NaivePrefixApply { prefix: prefix, stream: stream } } /// Returned by `naive_apply_prefix`. #[derive(Debug, Clone)] pub struct NaivePrefixApply<'a, S> { - prefix: Cow<'a, str>, - stream: S, + prefix: Cow<'a, str>, + stream: S, } impl<'a, S, T> Stream for NaivePrefixApply<'a, S> -where - S: Stream, + where S: Stream { - type Item = (String, T); - type Error = IoError; + type Item = (String, T); + type Error = IoError; - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - loop { - let item = try_ready!(self.stream.poll()); - match item { - Some(i) => { - if i.0.starts_with(&*self.prefix) { - return Ok(Async::Ready(Some(i))); - } - } - None => return Ok(Async::Ready(None)), - } - } - } + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let item = try_ready!(self.stream.poll()); + match item { + Some(i) => { + if i.0.starts_with(&*self.prefix) { + return Ok(Async::Ready(Some(i))); + } + } + None => return Ok(Async::Ready(None)), + } + } + } } /// Applies orderings on the stream data. Will simply pass data through if the list of orderings /// is empty. Otherwise will need to collect. pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V> -where - S: Stream + 'a, - I: IntoIterator, - I::IntoIter: 'a, - V: Ord + 'static, + where S: Stream + 'a, + I: IntoIterator, + I::IntoIter: 'a, + V: Ord + 'static { - let orders_iter = orders_iter.into_iter(); - if orders_iter.size_hint().1 == Some(0) { - return NaiveApplyOrdered { inner: NaiveApplyOrderedInner::PassThrough(stream) }; - } + let orders_iter = orders_iter.into_iter(); + if orders_iter.size_hint().1 == Some(0) { + return NaiveApplyOrdered { inner: NaiveApplyOrderedInner::PassThrough(stream) }; + } - let collected = stream - .collect() - .and_then(move |mut collected| { - for order in orders_iter { - match order { - Order::ByValueAsc => { - collected.sort_by(|a, b| a.1.cmp(&b.1)); - } - Order::ByValueDesc => { - collected.sort_by(|a, b| b.1.cmp(&a.1)); - } - Order::ByKeyAsc => { - collected.sort_by(|a, b| a.0.cmp(&b.0)); - } - Order::ByKeyDesc => { - collected.sort_by(|a, b| b.0.cmp(&a.0)); - } - } - } - Ok(iter_ok(collected.into_iter())) - }) - .flatten_stream(); + let collected = stream.collect() + .and_then(move |mut collected| { + for order in orders_iter { + match order { + Order::ByValueAsc => { + collected.sort_by(|a, b| a.1.cmp(&b.1)); + } + Order::ByValueDesc => { + collected.sort_by(|a, b| b.1.cmp(&a.1)); + } + Order::ByKeyAsc => { + collected.sort_by(|a, b| a.0.cmp(&b.0)); + } + Order::ByKeyDesc => { + collected.sort_by(|a, b| b.0.cmp(&a.0)); + } + } + } + Ok(iter_ok(collected.into_iter())) + }) + .flatten_stream(); - NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) } + NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) } } /// Returned by `naive_apply_ordered`. pub struct NaiveApplyOrdered<'a, S, T> { - inner: NaiveApplyOrderedInner<'a, S, T>, + inner: NaiveApplyOrderedInner<'a, S, T>, } enum NaiveApplyOrderedInner<'a, S, T> { - PassThrough(S), - Collected(Box + 'a>), + PassThrough(S), + Collected(Box + 'a>), } impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V> -where - S: Stream, + where S: Stream { - type Item = (String, V); - type Error = IoError; + type Item = (String, V); + type Error = IoError; - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - match self.inner { - NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(), - NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(), - } - } + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + match self.inner { + NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(), + NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(), + } + } } /// Filters the result of a stream to apply a set of filters. #[inline] pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> -where - S: Stream, - I: Iterator> + Clone, - V: 'a + where S: Stream, + I: Iterator> + Clone, + V: 'a { - NaiveFiltersApply { - filters: filters, - stream: stream, - marker: PhantomData, - } + NaiveFiltersApply { + filters: filters, + stream: stream, + marker: PhantomData, + } } /// Returned by `naive_apply_prefix`. #[derive(Debug, Clone)] pub struct NaiveFiltersApply<'a, S, I> { - filters: I, - stream: S, - marker: PhantomData<&'a ()>, + filters: I, + stream: S, + marker: PhantomData<&'a ()>, } impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I> -where - S: Stream< - Item = (String, T), - Error = IoError, - >, - I: Iterator> + Clone, - T: Ord + 'a, + where S: Stream, + I: Iterator> + Clone, + T: Ord + 'a { - type Item = (String, T); - type Error = IoError; + type Item = (String, T); + type Error = IoError; - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - 'outer: loop { - let item = try_ready!(self.stream.poll()); - match item { - Some(i) => { - for filter in self.filters.clone() { - if !naive_filter_test(&i, &filter) { - continue 'outer; - } - } - return Ok(Async::Ready(Some(i))); - } - None => return Ok(Async::Ready(None)), - } - } - } + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + 'outer: loop { + let item = try_ready!(self.stream.poll()); + match item { + Some(i) => { + for filter in self.filters.clone() { + if !naive_filter_test(&i, &filter) { + continue 'outer; + } + } + return Ok(Async::Ready(Some(i))); + } + None => return Ok(Async::Ready(None)), + } + } + } } #[inline] fn naive_filter_test(entry: &(String, T), filter: &Filter) -> bool - where T: Ord + where T: Ord { - let (expected_ordering, revert_expected) = match filter.operation { - FilterOp::Equal => (Ordering::Equal, false), - FilterOp::NotEqual => (Ordering::Equal, true), - FilterOp::Less => (Ordering::Less, false), - FilterOp::GreaterOrEqual => (Ordering::Less, true), - FilterOp::Greater => (Ordering::Greater, false), - FilterOp::LessOrEqual => (Ordering::Greater, true), - }; + let (expected_ordering, revert_expected) = match filter.operation { + FilterOp::Equal => (Ordering::Equal, false), + FilterOp::NotEqual => (Ordering::Equal, true), + FilterOp::Less => (Ordering::Less, false), + FilterOp::GreaterOrEqual => (Ordering::Less, true), + FilterOp::Greater => (Ordering::Greater, false), + FilterOp::LessOrEqual => (Ordering::Greater, true), + }; - match filter.ty { - FilterTy::KeyCompare(ref ref_value) => { - ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected - } - FilterTy::ValueCompare(ref ref_value) => { - (entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected - } - } + match filter.ty { + FilterTy::KeyCompare(ref ref_value) => { + ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected + } + FilterTy::ValueCompare(ref ref_value) => { + (entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected + } + } }