From 1420e03c6ed6176726e249b237a6f955a27e7d41 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Nov 2017 17:36:36 +0100 Subject: [PATCH 1/4] Initial implementation of datastore --- Cargo.toml | 1 + datastore/Cargo.toml | 10 ++ datastore/src/json_file.rs | 236 ++++++++++++++++++++++++++++ datastore/src/lib.rs | 40 +++++ datastore/src/query.rs | 311 +++++++++++++++++++++++++++++++++++++ 5 files changed, 598 insertions(+) create mode 100644 datastore/Cargo.toml create mode 100644 datastore/src/json_file.rs create mode 100644 datastore/src/lib.rs create mode 100644 datastore/src/query.rs diff --git a/Cargo.toml b/Cargo.toml index 6424a19c..79e61932 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "multistream-select", + "datastore", "libp2p-host", "libp2p-transport", "libp2p-tcp-transport", diff --git a/datastore/Cargo.toml b/datastore/Cargo.toml new file mode 100644 index 00000000..eccf68e1 --- /dev/null +++ b/datastore/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "datastore" +version = "0.1.0" +authors = ["pierre "] + +[dependencies] +base64 = "0.7" +futures = "0.1" +serde_json = "1.0" +tempfile = "2.2" diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs new file mode 100644 index 00000000..b9d2e7c9 --- /dev/null +++ b/datastore/src/json_file.rs @@ -0,0 +1,236 @@ + +use Datastore; +use base64; +use futures::Future; +use futures::stream::{Stream, iter_ok}; +use query::{Query, naive_apply_query}; +use serde_json::{from_reader, to_writer}; +use serde_json::map::Map; +use serde_json::value::Value; +use std::borrow::Cow; +use std::fs; +use std::io::Cursor; +use std::io::Error as IoError; +use std::io::ErrorKind as IoErrorKind; +use std::io::Read; +use std::path::PathBuf; +use std::sync::Mutex; +use tempfile::NamedTempFile; + +/// Implementation of `Datastore` that uses a single plain JSON file. +pub struct JsonFileDatastore { + path: PathBuf, + content: Mutex>, +} + +impl JsonFileDatastore { + /// Opens or creates the datastore. If the path refers to an existing path, then this function + /// will attempt to load an existing set of values from it (which can result in an error). + /// Otherwise if the path doesn't exist, a new empty datastore will be created. + pub fn new

(path: P) -> Result + where + P: Into, + { + let path = path.into(); + + if !path.exists() { + return Ok(JsonFileDatastore { + path: path, + content: Mutex::new(Map::new()), + }); + } + + let content = { + let mut file = fs::File::open(&path)?; + + // We want to support empty files (and treat them as an empty recordset). Unfortunately + // `serde_json` will always produce an error if we do this ("unexpected EOF at line 0 + // column 0"). Therefore we start by reading one byte from the file in order to check + // for EOF. + + let mut first_byte = [0]; + if file.read(&mut first_byte)? == 0 { + // File is empty. + Map::new() + } else { + match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { + Ok(Value::Null) => Map::new(), + Ok(Value::Object(map)) => map, + Ok(_) => { + return Err(IoError::new( + IoErrorKind::InvalidData, + "expected JSON object", + )); + } + Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), + } + } + }; + + Ok(JsonFileDatastore { + path: path, + content: Mutex::new(content), + }) + } + + /// Flushes the content of the datastore to the disk. + pub fn flush(&self) -> Result<(), IoError> { + // Create a temporary file in the same directory as the destination, which avoids the + // problem of having a file cleaner delete our file while we use it. + let self_path_parent = self.path.parent().ok_or(IoError::new( + IoErrorKind::Other, + "couldn't get parent directory of destination", + ))?; + let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; + + let content = self.content.lock().unwrap(); + to_writer(&mut temporary_file, &*content)?; + temporary_file.sync_data()?; + + // It is possible that `persist` fails for a legitimate reason (eg. persisting across + // filesystems won't work). + let temporary_file = match temporary_file.persist(&self.path) { + Ok(_) => return Ok(()), + Err(err) => err.file, + }; + + fs::copy(temporary_file.path(), &self.path)?; + Ok(()) + } +} + +impl Datastore for JsonFileDatastore { + fn put(&self, key: Cow, value: Vec) { + let mut content = self.content.lock().unwrap(); + content.insert(key.into_owned(), Value::String(base64::encode(&value))); + } + + fn get(&self, key: &str) -> Option> { + let content = self.content.lock().unwrap(); + // If the JSON is malformed, we just ignore the value. + content.get(key).and_then(|val| match val { + &Value::String(ref s) => base64::decode(s).ok(), + _ => None, + }) + } + + fn has(&self, key: &str) -> bool { + let content = self.content.lock().unwrap(); + content.contains_key(key) + } + + fn delete(&self, key: &str) -> bool { + let mut content = self.content.lock().unwrap(); + content.remove(key).is_some() + } + + fn query<'a>( + &'a self, + query: Query, + ) -> Box), Error = IoError> + 'a> { + let content = self.content.lock().unwrap(); + + let keys_only = query.keys_only; + + let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { + // Skip values that are malformed. + let value = if keys_only { + Vec::with_capacity(0) + } else { + match value { + &Value::String(ref s) => { + match base64::decode(s) { + Ok(s) => s, + Err(_) => return None, + } + } + _ => return None, + } + }; + + Some((key.clone(), value)) + })); + + let collected = naive_apply_query(content_stream, query) + .collect() + .wait() + .unwrap(); + let output_stream = iter_ok(collected.into_iter()); + Box::new(output_stream) as Box<_> + } +} + +impl Drop for JsonFileDatastore { + #[inline] + fn drop(&mut self) { + let _ = self.flush(); // What do we do in case of error? :-/ + } +} + +#[cfg(test)] +mod tests { + use {Query, Order, Filter, FilterTy, FilterOp}; + use Datastore; + use JsonFileDatastore; + use futures::{Future, Stream}; + use tempfile::NamedTempFile; + + #[test] + fn open_and_flush() { + let temp_file = NamedTempFile::new().unwrap(); + let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + datastore.flush().unwrap(); + } + + #[test] + fn values_store_and_reload() { + let temp_file = NamedTempFile::new().unwrap(); + + let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + datastore.put("foo".into(), vec![1, 2, 3]); + datastore.put("bar".into(), vec![0, 255, 127]); + datastore.flush().unwrap(); + drop(datastore); + + let reload = JsonFileDatastore::new(temp_file.path()).unwrap(); + assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); + assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); + } + + #[test] + fn query_basic() { + let temp_file = NamedTempFile::new().unwrap(); + + let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + datastore.put("foo1".into(), vec![1, 2, 3]); + datastore.put("foo2".into(), vec![4, 5, 6]); + datastore.put("foo3".into(), vec![7, 8, 9]); + datastore.put("foo4".into(), vec![10, 11, 12]); + datastore.put("foo5".into(), vec![13, 14, 15]); + datastore.put("bar1".into(), vec![0, 255, 127]); + datastore.flush().unwrap(); + + let query = datastore + .query(Query { + prefix: "fo".into(), + filters: vec![ + Filter { + ty: FilterTy::ValueCompare(vec![6, 7, 8].into()), + operation: FilterOp::Greater, + }, + ], + orders: vec![Order::ByKeyDesc], + skip: 1, + limit: u64::max_value(), + keys_only: false, + }) + .collect() + .wait() + .unwrap(); + + assert_eq!(query[0].0, "foo4"); + assert_eq!(query[0].1, &[10, 11, 12]); + assert_eq!(query[1].0, "foo3"); + assert_eq!(query[1].1, &[7, 8, 9]); + } +} diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs new file mode 100644 index 00000000..b496b778 --- /dev/null +++ b/datastore/src/lib.rs @@ -0,0 +1,40 @@ +extern crate base64; +#[macro_use] +extern crate futures; +extern crate serde_json; +extern crate tempfile; + +use futures::Stream; +use std::borrow::Cow; +use std::io::Error as IoError; + +mod query; +mod json_file; + +pub use self::json_file::JsonFileDatastore; +pub use self::query::{Query, Order, Filter, FilterTy, FilterOp}; + +/// Abstraction over any struct that can store `(key, value)` pairs. +pub trait Datastore { + /// Sets the value of a key. + fn put(&self, key: Cow, value: Vec); + + /// Returns the value corresponding to this key. + // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data + fn get(&self, key: &str) -> Option>; + + /// Returns true if the datastore contains the given key. + fn has(&self, key: &str) -> bool; + + /// Removes the given key from the datastore. Returns true if the key existed. + fn delete(&self, key: &str) -> bool; + + /// Executes a query on the key-value store. + /// + /// This operation is expensive on some implementations and cheap on others. It is your + /// responsibility to pick the right implementation for the right job. + fn query<'a>( + &'a self, + query: Query, + ) -> Box), Error = IoError> + 'a>; +} diff --git a/datastore/src/query.rs b/datastore/src/query.rs new file mode 100644 index 00000000..07d4304a --- /dev/null +++ b/datastore/src/query.rs @@ -0,0 +1,311 @@ + +use futures::{Stream, Future, Async, Poll}; +use futures::stream::{iter_ok, Take as StreamTake, Skip as StreamSkip}; +use std::borrow::Cow; +use std::cmp::Ordering; +use std::io::Error as IoError; +use std::marker::PhantomData; +use std::vec::IntoIter as VecIntoIter; + +/// Description of a query to apply on a datastore. +/// +/// The various modifications of the dataset are applied in the same order as the fields (prefix, +/// filters, orders, skip, limit). +#[derive(Debug, Clone)] +pub struct Query<'a> { + /// Only the keys that start with `prefix` will be returned. + pub prefix: Cow<'a, str>, + /// Filters to apply on the results. + pub filters: Vec>, + /// How to order the keys. Applied sequentially. + pub orders: Vec, + /// Number of elements to skip from at the start of the results. + pub skip: u64, + /// Maximum number of elements in the results. + pub limit: u64, + /// Only return keys. If true, then all the `Vec`s of the data will be empty. + pub keys_only: bool, +} + +/// A filter to apply to the results set. +#[derive(Debug, Clone)] +pub struct Filter<'a> { + /// Type of filter and value to compare with. + pub ty: FilterTy<'a>, + /// Comparison operation. + pub operation: FilterOp, +} + +/// Type of filter and value to compare with. +#[derive(Debug, Clone)] +pub enum FilterTy<'a> { + /// Compare the key with a reference value. + KeyCompare(Cow<'a, str>), + /// Compare the value with a reference value. + ValueCompare(Cow<'a, [u8]>), +} + +/// Filtering operation. Keep in mind that anything else than `Equal` and `NotEqual` is a bit +/// blurry. +#[derive(Debug, Copy, Clone)] +pub enum FilterOp { + Less, + LessOrEqual, + Equal, + NotEqual, + Greater, + GreaterOrEqual, +} + +/// Order in which to sort the results of a query. +#[derive(Debug, Copy, Clone)] +pub enum Order { + /// Put the values in ascending order. + ByValueAsc, + /// Put the values in descending order. + ByValueDesc, + /// Put the keys in ascending order. + ByKeyAsc, + /// Put the keys in descending order. + ByKeyDesc, +} + +/// Naively applies a query on a set of results. +pub fn naive_apply_query<'a, S>(stream: S, query: Query<'a>) + -> StreamTake, VecIntoIter>>>>>> + where S: Stream), Error = IoError> + 'a +{ + let prefixed = naive_apply_prefix(stream, query.prefix); + let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); + let ordered = naive_apply_ordered(filtered, query.orders); + let keys_only = naive_apply_keys_only(ordered, query.keys_only); + naive_apply_skip_limit(keys_only, query.skip, query.limit) +} + +/// Skips the `skip` first element of a stream and only returns `limit` elements. +#[inline] +pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> +where + S: Stream), Error = IoError>, +{ + stream.skip(skip).take(limit) +} + +/// Filters the result of a stream to empty values if `keys_only` is true. +#[inline] +pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply +where + S: Stream), Error = IoError>, +{ + NaiveKeysOnlyApply { + keys_only: keys_only, + stream: stream, + } +} + +/// Returned by `naive_apply_keys_only`. +#[derive(Debug, Clone)] +pub struct NaiveKeysOnlyApply { + keys_only: bool, + stream: S, +} + +impl Stream for NaiveKeysOnlyApply +where + S: Stream), Error = IoError>, +{ + type Item = (String, Vec); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + if self.keys_only { + Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { + v.1 = Vec::new(); + v + }))) + } else { + self.stream.poll() + } + } +} + +/// Filters the result of a stream to only keep the results with a prefix. +#[inline] +pub fn naive_apply_prefix<'a, S>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> +where + S: Stream), Error = IoError>, +{ + NaivePrefixApply { + prefix: prefix, + stream: stream, + } +} + +/// Returned by `naive_apply_prefix`. +#[derive(Debug, Clone)] +pub struct NaivePrefixApply<'a, S> { + prefix: Cow<'a, str>, + stream: S, +} + +impl<'a, S> Stream for NaivePrefixApply<'a, S> +where + S: Stream), Error = IoError>, +{ + type Item = (String, Vec); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let item = try_ready!(self.stream.poll()); + match item { + Some(i) => { + if i.0.starts_with(&*self.prefix) { + return Ok(Async::Ready(Some(i))); + } + } + None => return Ok(Async::Ready(None)), + } + } + } +} + +/// Applies orderings on the stream data. Will simply pass data through if the list of orderings +/// is empty. Otherwise will need to collect. +pub fn naive_apply_ordered<'a, S, I>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S> +where + S: Stream), Error = IoError> + 'a, + I: IntoIterator, + I::IntoIter: 'a, +{ + let orders_iter = orders_iter.into_iter(); + if orders_iter.size_hint().1 == Some(0) { + return NaiveApplyOrdered { inner: NaiveApplyOrderedInner::PassThrough(stream) }; + } + + let collected = stream + .collect() + .and_then(move |mut collected| { + for order in orders_iter { + match order { + Order::ByValueAsc => { + collected.sort_by(|a, b| a.1.cmp(&b.1)); + } + Order::ByValueDesc => { + collected.sort_by(|a, b| b.1.cmp(&a.1)); + } + Order::ByKeyAsc => { + collected.sort_by(|a, b| a.0.cmp(&b.0)); + } + Order::ByKeyDesc => { + collected.sort_by(|a, b| b.0.cmp(&a.0)); + } + } + } + Ok(iter_ok(collected.into_iter())) + }) + .flatten_stream(); + + NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) } +} + +/// Returned by `naive_apply_ordered`. +pub struct NaiveApplyOrdered<'a, S> { + inner: NaiveApplyOrderedInner<'a, S>, +} + +enum NaiveApplyOrderedInner<'a, S> { + PassThrough(S), + Collected(Box), Error = IoError> + 'a>), +} + +impl<'a, S> Stream for NaiveApplyOrdered<'a, S> +where + S: Stream), Error = IoError>, +{ + type Item = (String, Vec); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + match self.inner { + NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(), + NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(), + } + } +} + +/// Filters the result of a stream to apply a set of filters. +#[inline] +pub fn naive_apply_filters<'a, S, I>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> +where + S: Stream), Error = IoError>, + I: Iterator> + Clone, +{ + NaiveFiltersApply { + filters: filters, + stream: stream, + marker: PhantomData, + } +} + +/// Returned by `naive_apply_prefix`. +#[derive(Debug, Clone)] +pub struct NaiveFiltersApply<'a, S, I> { + filters: I, + stream: S, + marker: PhantomData<&'a ()>, +} + +impl<'a, S, I> Stream for NaiveFiltersApply<'a, S, I> +where + S: Stream< + Item = (String, Vec), + Error = IoError, + >, + I: Iterator> + Clone, +{ + type Item = (String, Vec); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + 'outer: loop { + let item = try_ready!(self.stream.poll()); + match item { + Some(i) => { + for filter in self.filters.clone() { + if !naive_filter_test(&i, &filter) { + continue 'outer; + } + } + return Ok(Async::Ready(Some(i))); + } + None => return Ok(Async::Ready(None)), + } + } + } +} + +#[inline] +fn naive_filter_test(entry: &(String, Vec), filter: &Filter) -> bool { + let (expected_ordering, revert_expected) = match filter.operation { + FilterOp::Less => (Ordering::Less, false), + FilterOp::LessOrEqual => (Ordering::Greater, true), + FilterOp::Equal => (Ordering::Less, false), + FilterOp::NotEqual => (Ordering::Less, true), + FilterOp::Greater => (Ordering::Greater, false), + FilterOp::GreaterOrEqual => (Ordering::Less, true), + }; + + match filter.ty { + FilterTy::KeyCompare(ref ref_value) => { + ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected + } + FilterTy::ValueCompare(ref ref_value) => { + ((&*entry.1).cmp(&**ref_value) == expected_ordering) != revert_expected + } + } +} From 03e75f509a29d2b7fe7b283ed5ff94ae02e00003 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 7 Nov 2017 18:40:25 +0100 Subject: [PATCH 2/4] Fix concerns --- datastore/Cargo.toml | 3 ++- datastore/src/json_file.rs | 19 +++++++++++-------- datastore/src/lib.rs | 1 + 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/datastore/Cargo.toml b/datastore/Cargo.toml index eccf68e1..a65c2f8c 100644 --- a/datastore/Cargo.toml +++ b/datastore/Cargo.toml @@ -1,10 +1,11 @@ [package] name = "datastore" version = "0.1.0" -authors = ["pierre "] +authors = ["Parity Technologies "] [dependencies] base64 = "0.7" futures = "0.1" +parking_lot = "0.4" serde_json = "1.0" tempfile = "2.2" diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index b9d2e7c9..c697d399 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -14,7 +14,7 @@ use std::io::Error as IoError; use std::io::ErrorKind as IoErrorKind; use std::io::Read; use std::path::PathBuf; -use std::sync::Mutex; +use parking_lot::Mutex; use tempfile::NamedTempFile; /// Implementation of `Datastore` that uses a single plain JSON file. @@ -83,7 +83,7 @@ impl JsonFileDatastore { ))?; let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; - let content = self.content.lock().unwrap(); + let content = self.content.lock(); to_writer(&mut temporary_file, &*content)?; temporary_file.sync_data()?; @@ -101,12 +101,12 @@ impl JsonFileDatastore { impl Datastore for JsonFileDatastore { fn put(&self, key: Cow, value: Vec) { - let mut content = self.content.lock().unwrap(); + let mut content = self.content.lock(); content.insert(key.into_owned(), Value::String(base64::encode(&value))); } fn get(&self, key: &str) -> Option> { - let content = self.content.lock().unwrap(); + let content = self.content.lock(); // If the JSON is malformed, we just ignore the value. content.get(key).and_then(|val| match val { &Value::String(ref s) => base64::decode(s).ok(), @@ -115,12 +115,12 @@ impl Datastore for JsonFileDatastore { } fn has(&self, key: &str) -> bool { - let content = self.content.lock().unwrap(); + let content = self.content.lock(); content.contains_key(key) } fn delete(&self, key: &str) -> bool { - let mut content = self.content.lock().unwrap(); + let mut content = self.content.lock(); content.remove(key).is_some() } @@ -128,7 +128,7 @@ impl Datastore for JsonFileDatastore { &'a self, query: Query, ) -> Box), Error = IoError> + 'a> { - let content = self.content.lock().unwrap(); + let content = self.content.lock(); let keys_only = query.keys_only; @@ -151,10 +151,13 @@ impl Datastore for JsonFileDatastore { Some((key.clone(), value)) })); + // `content_stream` reads from the content of the `Mutex`, so we need to clone the data + // into a `Vec` before returning. let collected = naive_apply_query(content_stream, query) .collect() .wait() - .unwrap(); + .expect("can only fail if either `naive_apply_query` or `content_stream` produce \ + an error, which cann't happen"); let output_stream = iter_ok(collected.into_iter()); Box::new(output_stream) as Box<_> } diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index b496b778..9c99f0a9 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -1,6 +1,7 @@ extern crate base64; #[macro_use] extern crate futures; +extern crate parking_lot; extern crate serde_json; extern crate tempfile; From 49823fc9859341e7234c12e872baf9b3713f1d35 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 8 Nov 2017 10:32:20 +0100 Subject: [PATCH 3/4] Simplify flush() --- datastore/src/json_file.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index c697d399..9dc81aee 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -87,14 +87,10 @@ impl JsonFileDatastore { to_writer(&mut temporary_file, &*content)?; temporary_file.sync_data()?; - // It is possible that `persist` fails for a legitimate reason (eg. persisting across - // filesystems won't work). - let temporary_file = match temporary_file.persist(&self.path) { - Ok(_) => return Ok(()), - Err(err) => err.file, - }; - - fs::copy(temporary_file.path(), &self.path)?; + // Note that `persist` will fail if we try to persist across filesystems. However that + // shouldn't happen since we created the temporary file in the same directory as the final + // path. + temporary_file.persist(&self.path)?; Ok(()) } } From 6a2d0613744e453ccf404509afe4edad1af31070 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 9 Nov 2017 16:45:28 +0100 Subject: [PATCH 4/4] Add license to code and some documentation --- datastore/src/json_file.rs | 31 ++++++++++++++++++++++++++++++- datastore/src/lib.rs | 20 ++++++++++++++++++++ datastore/src/query.rs | 19 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index 9dc81aee..a6c1b2c5 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -1,3 +1,22 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. use Datastore; use base64; @@ -74,6 +93,10 @@ impl JsonFileDatastore { } /// Flushes the content of the datastore to the disk. + /// + /// This function can only fail in case of a disk access error. If an error occurs, any change + /// to the datastore that was performed since the last successful flush will be lost. No data + /// will be corrupted. pub fn flush(&self) -> Result<(), IoError> { // Create a temporary file in the same directory as the destination, which avoids the // problem of having a file cleaner delete our file while we use it. @@ -162,7 +185,13 @@ impl Datastore for JsonFileDatastore { impl Drop for JsonFileDatastore { #[inline] fn drop(&mut self) { - let _ = self.flush(); // What do we do in case of error? :-/ + // Unfortunately there's not much we can do here in case of an error, as panicking would be + // very bad. Similar to `File`, the user should take care to call `flush()` before dropping + // the datastore. + // + // If an error happens here, any change since the last successful flush will be lost, but + // the data will not be corrupted. + let _ = self.flush(); } } diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index 9c99f0a9..227c78fd 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -1,3 +1,23 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + extern crate base64; #[macro_use] extern crate futures; diff --git a/datastore/src/query.rs b/datastore/src/query.rs index 07d4304a..2f57dc32 100644 --- a/datastore/src/query.rs +++ b/datastore/src/query.rs @@ -1,3 +1,22 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. use futures::{Stream, Future, Async, Poll}; use futures::stream::{iter_ok, Take as StreamTake, Skip as StreamSkip};