From aeceb04d506c01ff3beabbee26705d35165b0d40 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Nov 2017 17:24:28 +0100 Subject: [PATCH] General cleanup and rework --- Cargo.toml | 1 - datastore/Cargo.toml | 2 +- datastore/src/json_file.rs | 141 ++++++++------ datastore/src/lib.rs | 61 ++++-- datastore/src/query.rs | 34 ++-- libp2p-peer/Cargo.toml | 10 - libp2p-peer/src/lib.rs | 55 ------ libp2p-peerstore/Cargo.toml | 12 +- libp2p-peerstore/src/json_peerstore.rs | 154 ++++++++++++++++ libp2p-peerstore/src/lib.rs | 54 +++++- libp2p-peerstore/src/memory_peerstore.rs | 199 ++++++++++---------- libp2p-peerstore/src/peer_info.rs | 224 +++++++++++++++++++---- libp2p-peerstore/src/peerstore.rs | 140 +++++++++----- libp2p-peerstore/src/peerstore_tests.rs | 139 ++++++++++++++ multihash/src/lib.rs | 22 ++- 15 files changed, 910 insertions(+), 338 deletions(-) delete mode 100644 libp2p-peer/Cargo.toml delete mode 100644 libp2p-peer/src/lib.rs create mode 100644 libp2p-peerstore/src/json_peerstore.rs create mode 100644 libp2p-peerstore/src/peerstore_tests.rs diff --git a/Cargo.toml b/Cargo.toml index dd68f455..ddec2615 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ members = [ "multistream-select", "datastore", "libp2p-host", - "libp2p-peer", "libp2p-peerstore", "libp2p-transport", "libp2p-tcp-transport", diff --git a/datastore/Cargo.toml b/datastore/Cargo.toml index e2e7e1f7..f7b24ebc 100644 --- a/datastore/Cargo.toml +++ b/datastore/Cargo.toml @@ -5,8 +5,8 @@ authors = ["Parity Technologies "] [dependencies] base64 = "0.7" +chashmap = "2.2" futures = "0.1" -parking_lot = "0.4" serde = "1.0" serde_json = "1.0" tempfile = "2.2" diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index da847e6e..34231d05 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -18,35 +18,37 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +//! Implementation of `Datastore` that uses a single plain JSON file for storage. + use Datastore; +use chashmap::{CHashMap, WriteGuard}; use futures::Future; use futures::stream::{Stream, iter_ok}; -use parking_lot::Mutex; use query::{Query, naive_apply_query}; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::{Map, from_value, to_value, from_reader, to_writer}; use serde_json::value::Value; use std::borrow::Cow; -use std::collections::HashMap; use std::fs; use std::io::Cursor; use std::io::Error as IoError; use std::io::ErrorKind as IoErrorKind; use std::io::Read; +use std::ops::{Deref, DerefMut}; use std::path::PathBuf; use tempfile::NamedTempFile; /// Implementation of `Datastore` that uses a single plain JSON file. pub struct JsonFileDatastore - where T: Serialize + DeserializeOwned + where T: Serialize + DeserializeOwned + Clone { path: PathBuf, - content: Mutex>, + content: CHashMap, } impl JsonFileDatastore - where T: Serialize + DeserializeOwned + where T: Serialize + DeserializeOwned + Clone { /// Opens or creates the datastore. If the path refers to an existing path, then this function /// will attempt to load an existing set of values from it (which can result in an error). @@ -59,7 +61,7 @@ impl JsonFileDatastore if !path.exists() { return Ok(JsonFileDatastore { path: path, - content: Mutex::new(HashMap::new()), + content: CHashMap::new(), }); } @@ -74,12 +76,12 @@ impl JsonFileDatastore let mut first_byte = [0]; if file.read(&mut first_byte)? == 0 { // File is empty. - HashMap::new() + CHashMap::new() } else { match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { - Ok(Value::Null) => HashMap::new(), + Ok(Value::Null) => CHashMap::new(), Ok(Value::Object(map)) => { - let mut out = HashMap::with_capacity(map.len()); + let mut out = CHashMap::with_capacity(map.len()); for (key, value) in map.into_iter() { let value = match from_value(value) { Ok(v) => v, @@ -99,10 +101,7 @@ impl JsonFileDatastore } }; - Ok(JsonFileDatastore { - path: path, - content: Mutex::new(content), - }) + Ok(JsonFileDatastore { path: path, content: content }) } /// Flushes the content of the datastore to the disk. @@ -110,22 +109,24 @@ impl JsonFileDatastore /// This function can only fail in case of a disk access error. If an error occurs, any change /// to the datastore that was performed since the last successful flush will be lost. No data /// will be corrupted. - pub fn flush(&self) -> Result<(), IoError> { + pub fn flush(&self) -> Result<(), IoError> + where T: Clone + { // Create a temporary file in the same directory as the destination, which avoids the // problem of having a file cleaner delete our file while we use it. let self_path_parent = self.path - .parent() - .ok_or(IoError::new( + .parent() + .ok_or(IoError::new( IoErrorKind::Other, "couldn't get parent directory of destination", ))?; let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; - let content = self.content.lock(); + let content = self.content.clone().into_iter(); to_writer( &mut temporary_file, - &content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::>(), - )?; // TODO: panic! + &content.map(|(k, v)| (k, to_value(v).unwrap())).collect::>(), + )?; temporary_file.sync_data()?; // Note that `persist` will fail if we try to persist across filesystems. However that @@ -136,60 +137,75 @@ impl JsonFileDatastore } } -impl Datastore for JsonFileDatastore - where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static +impl<'a, T> Datastore for &'a JsonFileDatastore + where T: Clone + Serialize + DeserializeOwned + Default + PartialOrd + 'static { + type Entry = JsonFileDatastoreEntry<'a, T>; + type QueryResult = Box + 'a>; + #[inline] - fn put(&self, key: Cow, value: T) { - let mut content = self.content.lock(); - content.insert(key.into_owned(), value); + fn lock(self, key: Cow) -> Option { + self.content.get_mut(&key.into_owned()).map(JsonFileDatastoreEntry) } - fn get(&self, key: &str) -> Option { - let content = self.content.lock(); - // If the JSON is malformed, we just ignore the value. - content.get(key).cloned() + #[inline] + fn lock_or_create(self, key: Cow) -> Self::Entry { + loop { + self.content.upsert(key.clone().into_owned(), || Default::default(), |_| {}); + + // There is a slight possibility that another thread will delete our value in this + // small interval. If this happens, we just loop and reinsert the value again until + // we can acquire a lock. + if let Some(v) = self.content.get_mut(&key.clone().into_owned()) { + return JsonFileDatastoreEntry(v); + } + } } - fn has(&self, key: &str) -> bool { - let content = self.content.lock(); - content.contains_key(key) + #[inline] + fn put(self, key: Cow, value: T) { + self.content.insert(key.into_owned(), value); } - fn delete(&self, key: &str) -> bool { - let mut content = self.content.lock(); - content.remove(key).is_some() + #[inline] + fn get(self, key: &str) -> Option { + self.content.get(&key.to_owned()).map(|v| v.clone()) } - fn query<'a>( - &'a self, - query: Query, - ) -> Box + 'a> { - let content = self.content.lock(); + #[inline] + fn has(self, key: &str) -> bool { + self.content.contains_key(&key.to_owned()) + } + + #[inline] + fn delete(self, key: &str) -> Option { + self.content.remove(&key.to_owned()) + } + + fn query(self, query: Query) -> Self::QueryResult { + let content = self.content.clone(); let keys_only = query.keys_only; - let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { + let content_stream = iter_ok(content.into_iter().filter_map(|(key, value)| { // Skip values that are malformed. - let value = if keys_only { Default::default() } else { value.clone() }; - - Some((key.clone(), value)) + let value = if keys_only { Default::default() } else { value }; + Some((key, value)) })); // `content_stream` reads from the content of the `Mutex`, so we need to clone the data // into a `Vec` before returning. - let collected = naive_apply_query(content_stream, query) - .collect() - .wait() - .expect("can only fail if either `naive_apply_query` or `content_stream` produce \ - an error, which cann't happen"); + let collected = naive_apply_query(content_stream, query).collect().wait().expect( + "can only fail if either `naive_apply_query` or `content_stream` produce \ + an error, which cann't happen", + ); let output_stream = iter_ok(collected.into_iter()); Box::new(output_stream) as Box<_> } } impl Drop for JsonFileDatastore - where T: Serialize + DeserializeOwned + where T: Serialize + DeserializeOwned + Clone { #[inline] fn drop(&mut self) { @@ -203,6 +219,27 @@ impl Drop for JsonFileDatastore } } +/// Implementation of `Datastore` that uses a single plain JSON file. +pub struct JsonFileDatastoreEntry<'a, T>(WriteGuard<'a, String, T>) where T: 'a; + +impl<'a, T> Deref for JsonFileDatastoreEntry<'a, T> + where T: 'a +{ + type Target = T; + + fn deref(&self) -> &T { + &*self.0 + } +} + +impl<'a, T> DerefMut for JsonFileDatastoreEntry<'a, T> + where T: 'a +{ + fn deref_mut(&mut self) -> &mut T { + &mut *self.0 + } +} + #[cfg(test)] mod tests { use {Query, Order, Filter, FilterTy, FilterOp}; @@ -259,9 +296,9 @@ mod tests { limit: u64::max_value(), keys_only: false, }) - .collect() - .wait() - .unwrap(); + .collect() + .wait() + .unwrap(); assert_eq!(query[0].0, "foo4"); assert_eq!(query[0].1, &[10, 11, 12]); diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index 57fb50e7..cc1829b1 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. extern crate base64; +extern crate chashmap; #[macro_use] extern crate futures; -extern crate parking_lot; extern crate serde; extern crate serde_json; extern crate tempfile; @@ -29,34 +29,67 @@ extern crate tempfile; use futures::Stream; use std::borrow::Cow; use std::io::Error as IoError; +use std::ops::DerefMut; mod query; mod json_file; -pub use self::json_file::JsonFileDatastore; +pub use self::json_file::{JsonFileDatastore, JsonFileDatastoreEntry}; pub use self::query::{Query, Order, Filter, FilterTy, FilterOp}; /// Abstraction over any struct that can store `(key, value)` pairs. pub trait Datastore { - /// Sets the value of a key. - fn put(&self, key: Cow, value: T); + /// Locked entry. + type Entry: DerefMut; + /// Output of a query. + type QueryResult: Stream; - /// Returns the value corresponding to this key. - // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data - fn get(&self, key: &str) -> Option; + /// Sets the value of a key. + #[inline] + fn put(self, key: Cow, value: T) + where Self: Sized + { + *self.lock_or_create(key) = value; + } + + /// Checks if an entry exists, and if so locks it. + /// + /// Trying to lock a value that is already locked will block, therefore you should keep locks + /// for a duration that is as short as possible. + fn lock(self, key: Cow) -> Option; + + /// Locks an entry if it exists, or creates it otherwise. + /// + /// Same as `put` followed with `lock`, except that it is atomic. + fn lock_or_create(self, key: Cow) -> Self::Entry; + + /// Returns the value corresponding to this key by cloning it. + #[inline] + fn get(self, key: &str) -> Option + where Self: Sized, + T: Clone + { + self.lock(key.into()).map(|v| v.clone()) + } /// Returns true if the datastore contains the given key. - fn has(&self, key: &str) -> bool; + /// + /// > **Note**: Keep in mind that using this operation is probably racy. A secondary thread + /// > can delete a key right after you called `has()`. In other words, this function + /// > returns whether an entry with that key existed in the short past. + #[inline] + fn has(self, key: &str) -> bool + where Self: Sized + { + self.lock(key.into()).is_some() + } - /// Removes the given key from the datastore. Returns true if the key existed. - fn delete(&self, key: &str) -> bool; + /// Removes the given key from the datastore. Returns the old value if the key existed. + fn delete(self, key: &str) -> Option; /// Executes a query on the key-value store. /// /// This operation is expensive on some implementations and cheap on others. It is your /// responsibility to pick the right implementation for the right job. - fn query<'a>( - &'a self, - query: Query, - ) -> Box + 'a>; + fn query(self, query: Query) -> Self::QueryResult; } diff --git a/datastore/src/query.rs b/datastore/src/query.rs index a83c353b..68423752 100644 --- a/datastore/src/query.rs +++ b/datastore/src/query.rs @@ -92,7 +92,7 @@ pub enum Order { pub fn naive_apply_query<'a, S, V>(stream: S, query: Query<'a, V>) -> StreamTake, VecIntoIter>>, V>>>> where S: Stream + 'a, - V: Clone + Ord + Default + 'static + V: Clone + PartialOrd + Default + 'static { let prefixed = naive_apply_prefix(stream, query.prefix); let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); @@ -129,7 +129,7 @@ pub struct NaiveKeysOnlyApply { impl Stream for NaiveKeysOnlyApply where S: Stream, - T: Default + T: Default { type Item = (String, T); type Error = IoError; @@ -188,9 +188,9 @@ impl<'a, S, T> Stream for NaivePrefixApply<'a, S> /// is empty. Otherwise will need to collect. pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V> where S: Stream + 'a, - I: IntoIterator, - I::IntoIter: 'a, - V: Ord + 'static + I: IntoIterator, + I::IntoIter: 'a, + V: PartialOrd + 'static { let orders_iter = orders_iter.into_iter(); if orders_iter.size_hint().1 == Some(0) { @@ -198,26 +198,26 @@ pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApply } let collected = stream.collect() - .and_then(move |mut collected| { + .and_then(move |mut collected| { for order in orders_iter { match order { Order::ByValueAsc => { - collected.sort_by(|a, b| a.1.cmp(&b.1)); + collected.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal)); } Order::ByValueDesc => { - collected.sort_by(|a, b| b.1.cmp(&a.1)); + collected.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal)); } Order::ByKeyAsc => { - collected.sort_by(|a, b| a.0.cmp(&b.0)); + collected.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal)); } Order::ByKeyDesc => { - collected.sort_by(|a, b| b.0.cmp(&a.0)); + collected.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal)); } } } Ok(iter_ok(collected.into_iter())) }) - .flatten_stream(); + .flatten_stream(); NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) } } @@ -251,8 +251,8 @@ impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V> #[inline] pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> where S: Stream, - I: Iterator> + Clone, - V: 'a + I: Iterator> + Clone, + V: 'a { NaiveFiltersApply { filters: filters, @@ -271,8 +271,8 @@ pub struct NaiveFiltersApply<'a, S, I> { impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I> where S: Stream, - I: Iterator> + Clone, - T: Ord + 'a + I: Iterator> + Clone, + T: PartialOrd + 'a { type Item = (String, T); type Error = IoError; @@ -298,7 +298,7 @@ impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I> #[inline] fn naive_filter_test(entry: &(String, T), filter: &Filter) -> bool - where T: Ord + where T: PartialOrd { let (expected_ordering, revert_expected) = match filter.operation { FilterOp::Equal => (Ordering::Equal, false), @@ -314,7 +314,7 @@ fn naive_filter_test(entry: &(String, T), filter: &Filter) -> bool ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected } FilterTy::ValueCompare(ref ref_value) => { - (entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected + (entry.1.partial_cmp(&**ref_value) == Some(expected_ordering)) != revert_expected } } } diff --git a/libp2p-peer/Cargo.toml b/libp2p-peer/Cargo.toml deleted file mode 100644 index 3ef2bd53..00000000 --- a/libp2p-peer/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "libp2p-peer" -version = "0.1.0" -authors = ["Parity Technologies "] - -[dependencies] -base58 = "0.1" - -[dev-dependencies] -multihash = "0.6" diff --git a/libp2p-peer/src/lib.rs b/libp2p-peer/src/lib.rs deleted file mode 100644 index ee11532b..00000000 --- a/libp2p-peer/src/lib.rs +++ /dev/null @@ -1,55 +0,0 @@ -extern crate base58; - -use std::fmt; -use base58::ToBase58; - -/// A PeerId is a reference to a multihash -/// Ideally we would want to store the Multihash object directly here but because -/// the multihash package is lacking some things right now, lets store a reference to -/// some bytes that represent the full bytes of the multihash -#[derive(PartialEq, Eq, Hash, Debug, Clone)] -pub struct PeerId { - /// Rereference to multihash bytes - multihash: Vec -} - -impl fmt::Display for PeerId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.to_base58()) - } -} - -impl PeerId { - /// Create a new PeerId from a multihash - pub fn new(mh: Vec) -> PeerId { - PeerId { multihash: mh } - } - - /// Outputs the multihash as a Base58 string, - /// this is what we use as our stringified version of the ID - pub fn to_base58(&self) -> String { - self.multihash.to_base58() - } -} - - -#[cfg(test)] -mod tests { - extern crate multihash; - use self::multihash::{encode, Hash}; - use super::{PeerId}; - - #[test] - fn peer_id_produces_correct_b58() { - let multihash_bytes = encode(Hash::SHA2256, b"hello world").unwrap(); - let peer_id = PeerId::new(multihash_bytes); - assert_eq!(peer_id.to_base58(), "QmaozNR7DZHQK1ZcU9p7QdrshMvXqWK6gpu5rmrkPdT3L4"); - } - - #[test] - fn string_key_concatenates_correctly() { - let multihash_bytes = encode(Hash::SHA2256, b"hello world").unwrap(); - let peer_id = PeerId::new(multihash_bytes); - assert_eq!(peer_id.string_key("hello"), "QmaozNR7DZHQK1ZcU9p7QdrshMvXqWK6gpu5rmrkPdT3L4/hello"); - } -} diff --git a/libp2p-peerstore/Cargo.toml b/libp2p-peerstore/Cargo.toml index ffef97df..c66e682a 100644 --- a/libp2p-peerstore/Cargo.toml +++ b/libp2p-peerstore/Cargo.toml @@ -4,6 +4,14 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -error-chain = "0.11" +base58 = "0.1.0" +datastore = { path = "../datastore" } +futures = "0.1.0" +owning_ref = "0.3.3" multiaddr = "0.2" -libp2p-peer = { path = "../libp2p-peer" } +multihash = { path = "../multihash" } +serde = "1.0" +serde_derive = "1.0" + +[dev-dependencies] +tempfile = "2.2" diff --git a/libp2p-peerstore/src/json_peerstore.rs b/libp2p-peerstore/src/json_peerstore.rs new file mode 100644 index 00000000..83b53027 --- /dev/null +++ b/libp2p-peerstore/src/json_peerstore.rs @@ -0,0 +1,154 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the `Peerstore` trait that uses a single JSON file as backend. + +use super::TTL; +use PeerId; +use base58::{FromBase58, ToBase58}; +use datastore::{Datastore, Query, JsonFileDatastore, JsonFileDatastoreEntry}; +use futures::{Future, Stream}; +use multiaddr::Multiaddr; +use multihash::Multihash; +use peer_info::{PeerInfo, AddAddrBehaviour}; +use peerstore::{Peerstore, PeerAccess}; +use std::io::Error as IoError; +use std::iter; +use std::path::PathBuf; +use std::vec::IntoIter as VecIntoIter; + +/// Peerstore backend that uses a Json file. +pub struct JsonPeerstore { + store: JsonFileDatastore, +} + +impl JsonPeerstore { + /// Opens a new peerstore tied to a JSON file at the given path. + /// + /// If the file exists, this function will open it. In any case, flushing the peerstore or + /// destroying it will write to the file. + #[inline] + pub fn new

(path: P) -> Result + where P: Into + { + Ok(JsonPeerstore { store: JsonFileDatastore::new(path)? }) + } + + /// Flushes the content of the peer store to the disk. + /// + /// This function can only fail in case of a disk access error. If an error occurs, any change + /// to the peerstore that was performed since the last successful flush will be lost. No data + /// will be corrupted. + #[inline] + pub fn flush(&self) -> Result<(), IoError> { + self.store.flush() + } +} + +impl<'a> Peerstore for &'a JsonPeerstore { + type PeerAccess = JsonPeerstoreAccess<'a>; + type PeersIter = Box>; + + #[inline] + fn peer(self, peer_id: &PeerId) -> Option { + let hash = peer_id.clone().as_bytes().to_base58(); + self.store.lock(hash.into()).map(JsonPeerstoreAccess) + } + + #[inline] + fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess { + let hash = peer_id.clone().as_bytes().to_base58(); + JsonPeerstoreAccess(self.store.lock_or_create(hash.into())) + } + + fn peers(self) -> Self::PeersIter { + let query = self.store.query(Query { + prefix: "".into(), + filters: vec![], + orders: vec![], + skip: 0, + limit: u64::max_value(), + keys_only: true, + }); + + let list = query.filter_map(|(key, _)| { + // We filter out invalid elements. This can happen if the JSON storage file was + // corrupted or manually modified by the user. + match key.from_base58() { + Ok(bytes) => Multihash::decode_bytes(bytes).ok(), + Err(_) => return None, + } + }) + .collect() + .wait(); // Wait can never block for the JSON datastore. + + // Need to handle I/O errors. Again we just ignore. + if let Ok(list) = list { + Box::new(list.into_iter()) as Box<_> + } else { + Box::new(iter::empty()) as Box<_> + } + } +} + +pub struct JsonPeerstoreAccess<'a>(JsonFileDatastoreEntry<'a, PeerInfo>); + +impl<'a> PeerAccess for JsonPeerstoreAccess<'a> { + type AddrsIter = VecIntoIter; + + #[inline] + fn addrs(&self) -> Self::AddrsIter { + self.0.addrs().cloned().collect::>().into_iter() + } + + #[inline] + fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) { + self.0.add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior); + } + + #[inline] + fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) { + self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl); + } + + #[inline] + fn clear_addrs(&mut self) { + self.0.set_addrs(iter::empty()); + } + + #[inline] + fn get_pub_key(&self) -> Option<&[u8]> { + self.0.public_key() + } + + #[inline] + fn set_pub_key(&mut self, key: Vec) { + self.0.set_public_key(key); + } +} + +#[cfg(test)] +mod tests { + extern crate tempfile; + peerstore_tests!( + {::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap()} + {let temp_file = self::tempfile::NamedTempFile::new().unwrap()} + ); +} diff --git a/libp2p-peerstore/src/lib.rs b/libp2p-peerstore/src/lib.rs index 9bf01afd..78454e21 100644 --- a/libp2p-peerstore/src/lib.rs +++ b/libp2p-peerstore/src/lib.rs @@ -1,9 +1,55 @@ -#[macro_use] extern crate error_chain; -extern crate multiaddr; -extern crate libp2p_peer as peer; +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. -mod memory_peerstore; +//! # Peerstore +//! +//! The `peerstore` crate allows one to store information about a peer. +//! It is similar to a key-value database, where the keys are multihashes (generally the hash of +//! the public key of the peer, but that is not enforced by this crate) and the values are the +//! public key and a list of multiaddresses with a time-to-live. +//! +//! This crate consists in a generic `Peerstore` trait and various backends. +//! +//! Note that the peerstore implementations do not consider information inside a peer store to be +//! critical. In case of an error (eg. corrupted file, disk error, etc.) they will prefer to lose +//! data rather than returning the error. + +extern crate base58; +extern crate datastore; +extern crate futures; +extern crate multiaddr; +extern crate multihash; +extern crate owning_ref; +extern crate serde; +#[macro_use] +extern crate serde_derive; + +pub use self::peerstore::{Peerstore, PeerAccess}; + +#[macro_use] +mod peerstore_tests; + +pub mod json_peerstore; +pub mod memory_peerstore; mod peerstore; mod peer_info; +pub type PeerId = multihash::Multihash; pub type TTL = std::time::Duration; diff --git a/libp2p-peerstore/src/memory_peerstore.rs b/libp2p-peerstore/src/memory_peerstore.rs index f944881f..4c6c9ce4 100644 --- a/libp2p-peerstore/src/memory_peerstore.rs +++ b/libp2p-peerstore/src/memory_peerstore.rs @@ -1,132 +1,123 @@ -use std::collections::{HashMap, HashSet}; -use std::collections::hash_map; -use multiaddr::Multiaddr; -use peer::PeerId; -use peer_info::PeerInfo; +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the `Peerstore` trait that simple stores peers in memory. + use super::TTL; -use peerstore::*; +use PeerId; +use multiaddr::Multiaddr; +use multihash::Multihash; +use owning_ref::OwningRefMut; +use peer_info::{PeerInfo, AddAddrBehaviour}; +use peerstore::{Peerstore, PeerAccess}; +use std::collections::HashMap; +use std::iter; +use std::sync::{Mutex, MutexGuard}; +use std::vec::IntoIter as VecIntoIter; -pub struct MemoryPeerstore { - store: HashMap>, +/// Implementation of the `Peerstore` trait that simply stores the peer information in memory. +pub struct MemoryPeerstore { + store: Mutex>, } -impl MemoryPeerstore { - pub fn new() -> MemoryPeerstore { - MemoryPeerstore { - store: HashMap::new(), - } +impl MemoryPeerstore { + /// Initializes a new `MemoryPeerstore`. The database is initially empty. + #[inline] + pub fn empty() -> MemoryPeerstore { + MemoryPeerstore { store: Mutex::new(HashMap::new()) } } } -impl Peerstore for MemoryPeerstore { - fn add_peer(&mut self, peer_id: PeerId, peer_info: PeerInfo) { - self.store.insert(peer_id, peer_info); +impl Default for MemoryPeerstore { + #[inline] + fn default() -> MemoryPeerstore { + MemoryPeerstore::empty() } - /// Returns a list of peers in this Peerstore - fn peers(&self) -> Vec<&PeerId> { - // this is terrible but I honestly can't think of any other way than to hand off ownership - // through this type of allocation or handing off the entire hashmap and letting people do what they - // want with that - self.store.keys().collect() - } - /// Returns the PeerInfo for a specific peer in this peer store, or None if it doesn't exist. - fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { - self.store.get(peer_id) +} + +impl<'a> Peerstore for &'a MemoryPeerstore { + type PeerAccess = MemoryPeerstoreAccess<'a>; + type PeersIter = VecIntoIter; + + fn peer(self, peer_id: &PeerId) -> Option { + let lock = self.store.lock().unwrap(); + OwningRefMut::new(lock) + .try_map_mut(|n| n.get_mut(peer_id).ok_or(())) + .ok() + .map(MemoryPeerstoreAccess) } - /// Try to get a property for a given peer - fn get_data(&self, peer_id: &PeerId, key: &str) -> Option<&T> { - match self.store.get(peer_id) { - None => None, - Some(peer_info) => peer_info.get_data(key), - } - } - /// Set a property for a given peer - fn put_data(&mut self, peer_id: &PeerId, key: String, val: T) { - match self.store.get_mut(peer_id) { - None => (), - Some(mut peer_info) => { - peer_info.set_data(key, val); - }, - } + fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess { + let lock = self.store.lock().unwrap(); + let r = OwningRefMut::new(lock) + .map_mut(|n| n.entry(peer_id.clone()).or_insert_with(|| PeerInfo::new())); + MemoryPeerstoreAccess(r) } - /// Adds an address to a peer - fn add_addr(&mut self, peer_id: &PeerId, addr: Multiaddr, ttl: TTL) { - match self.store.get_mut(peer_id) { - None => (), - Some(peer_info) => peer_info.add_addr(addr), - } + fn peers(self) -> Self::PeersIter { + let lock = self.store.lock().unwrap(); + lock.keys().cloned().collect::>().into_iter() + } +} + +// Note: Rust doesn't provide a `MutexGuard::map` method, otherwise we could directly store a +// `MutexGuard<'a, (&'a Multihash, &'a PeerInfo)>`. +pub struct MemoryPeerstoreAccess<'a>(OwningRefMut>, PeerInfo>); + +impl<'a> PeerAccess for MemoryPeerstoreAccess<'a> { + type AddrsIter = VecIntoIter; + + #[inline] + fn addrs(&self) -> Self::AddrsIter { + self.0.addrs().cloned().collect::>().into_iter() } - // AddAddrs gives AddrManager addresses to use, with a given ttl - // (time-to-live), after which the address is no longer valid. - // If the manager has a longer TTL, the operation is a no-op for that address - fn add_addrs(&mut self, peer_id: &PeerId, addrs: Vec, ttl: TTL) { - match self.store.get_mut(peer_id) { - None => (), - Some(peer_info) => { - for addr in addrs { - peer_info.add_addr(addr) - } - }, - } + #[inline] + fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) { + self.0.add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior); } - // SetAddr calls mgr.SetAddrs(p, addr, ttl) - fn set_addr(&mut self, peer_id: &PeerId, addr: Multiaddr, ttl: TTL) { - self.set_addrs(peer_id, vec![addr], ttl) + #[inline] + fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) { + self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl); } - // SetAddrs sets the ttl on addresses. This clears any TTL there previously. - // This is used when we receive the best estimate of the validity of an address. - fn set_addrs(&mut self, peer_id: &PeerId, addrs: Vec, ttl: TTL) { - match self.store.get_mut(peer_id) { - None => (), - Some(peer_info) => peer_info.set_addrs(addrs), - } + #[inline] + fn clear_addrs(&mut self) { + self.0.set_addrs(iter::empty()); } - /// Returns all known (and valid) addresses for a given peer - fn addrs(&self, peer_id: &PeerId) -> &[Multiaddr] { - match self.store.get(peer_id) { - None => &[], - Some(peer_info) => peer_info.get_addrs(), - } + #[inline] + fn get_pub_key(&self) -> Option<&[u8]> { + self.0.public_key() } - /// Removes all previously stored addresses - fn clear_addrs(&mut self, peer_id: &PeerId) { - match self.store.get_mut(peer_id) { - None => (), - Some(peer_info) => peer_info.set_addrs(vec![]), - } - } - - /// Get public key for a peer - fn get_pub_key(&self, peer_id: &PeerId) -> Option<&[u8]> { - self.store.get(peer_id).map(|peer_info| peer_info.get_public_key()) - } - - /// Set public key for a peer - fn set_pub_key(&mut self, peer_id: &PeerId, key: Vec) { - self.store.get_mut(peer_id).map(|peer_info| peer_info.set_public_key(key)); + #[inline] + fn set_pub_key(&mut self, key: Vec) { + self.0.set_public_key(key); } } #[cfg(test)] mod tests { - use peer::PeerId; - use super::{PeerInfo, Peerstore, MemoryPeerstore}; - - #[test] - fn insert_get_and_list() { - let peer_id = PeerId::new(vec![1,2,3]); - let peer_info = PeerInfo::new(); - let mut peer_store: MemoryPeerstore = MemoryPeerstore::new(); - peer_store.add_peer(peer_id.clone(), peer_info); - peer_store.put_data(&peer_id, "test".into(), 123u8).unwrap(); - let got = peer_store.get_data(&peer_id, "test").expect("should be able to fetch"); - assert_eq!(*got, 123u8); - } + peerstore_tests!({ + ::memory_peerstore::MemoryPeerstore::empty() + }); } diff --git a/libp2p-peerstore/src/peer_info.rs b/libp2p-peerstore/src/peer_info.rs index 37a08fb4..ff23d3af 100644 --- a/libp2p-peerstore/src/peer_info.rs +++ b/libp2p-peerstore/src/peer_info.rs @@ -1,41 +1,195 @@ -use std::time; -use std::collections::{HashMap, HashSet}; -use multiaddr::Multiaddr; +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. -pub struct PeerInfo { - public_key: Vec, - addrs: Vec, - data: HashMap, +//! The objective of the `peerstore` crate is to provide a key-value storage. Keys are peer IDs, +//! and the `PeerInfo` struct in this module is the value. +//! +//! Note that the `PeerInfo` struct implements `PartialOrd` so that it can be stored in a +//! `Datastore`. This operation currently simply compares the public keys of the `PeerInfo`s. +//! If the `PeerInfo` struct ever gets exposed to the public API of the crate, we may want to give +//! more thoughts about this. + +use TTL; +use multiaddr::Multiaddr; +use serde::{Serialize, Deserialize, Serializer, Deserializer}; +use serde::de::Error as DeserializerError; +use serde::ser::SerializeStruct; +use std::cmp::Ordering; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +/// Information about a peer. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct PeerInfo { + // Adresses, and the time at which they will be considered expired. + addrs: Vec<(Multiaddr, SystemTime)>, + public_key: Option>, } -impl PeerInfo { - pub fn new() -> PeerInfo { - PeerInfo { - public_key: vec![], - addrs: vec![], - data: HashMap::new(), +impl PeerInfo { + /// Builds a new empty `PeerInfo`. + #[inline] + pub fn new() -> PeerInfo { + PeerInfo { addrs: vec![], public_key: None } + } + + /// Returns the list of the non-expired addresses stored in this `PeerInfo`. + /// + /// > **Note**: Keep in mind that this function is racy because addresses can expire between + /// > the moment when you get them and the moment when you process them. + // TODO: use -> impl Iterator eventually + #[inline] + pub fn addrs<'a>(&'a self) -> Box + 'a> { + let now = SystemTime::now(); + Box::new(self.addrs.iter().filter_map(move |&(ref addr, ref expires)| if *expires >= now { + Some(addr) + } else { + None + })) + } + + /// Sets the list of addresses and their time-to-live. + /// + /// This removes all previously-stored addresses. + #[inline] + pub fn set_addrs(&mut self, addrs: I) + where I: IntoIterator + { + let now = SystemTime::now(); + self.addrs = addrs.into_iter().map(move |(addr, ttl)| (addr, now + ttl)).collect(); + } + + /// Adds a single address and its time-to-live. + /// + /// If the peer info already knows about that address but with a longer TTL, then the operation + /// is a no-op. + pub fn add_addr(&mut self, addr: Multiaddr, ttl: TTL, behaviour: AddAddrBehaviour) { + let expires = SystemTime::now() + ttl; + + if let Some(&mut (_, ref mut existing_expires)) = + self.addrs.iter_mut().find(|&&mut (ref a, _)| a == &addr) + { + if behaviour == AddAddrBehaviour::OverwriteTtl || *existing_expires < expires { + *existing_expires = expires; + } + return; + } + + self.addrs.push((addr, expires)); + } + + /// Sets the public key stored in this `PeerInfo`. + #[inline] + pub fn set_public_key(&mut self, key: Vec) { + self.public_key = Some(key); + } + + /// Returns the public key stored in this `PeerInfo`, if any. + #[inline] + pub fn public_key(&self) -> Option<&[u8]> { + self.public_key.as_ref().map(|k| &**k) + } +} + +/// Behaviour of the `add_addr` function. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum AddAddrBehaviour { + /// Always overwrite the existing TTL. + OverwriteTtl, + /// Don't overwrite if the TTL is larger. + IgnoreTtlIfInferior, +} + +impl Serialize for PeerInfo { + fn serialize(&self, serializer: S) -> Result + where S: Serializer + { + let mut s = serializer.serialize_struct("PeerInfo", 2)?; + s.serialize_field( + "addrs", + &self.addrs + .iter() + .map(|&(ref addr, ref expires)| { + let addr = addr.to_bytes(); + let from_epoch = expires.duration_since(UNIX_EPOCH) + // This `unwrap_or` case happens if the user has their system time set to + // before EPOCH. Times-to-live will be be longer than expected, but it's a very + // improbable corner case and is not attackable in any way, so we don't really + // care. + .unwrap_or(Duration::new(0, 0)); + let secs = from_epoch.as_secs() + .saturating_mul(1_000) + .saturating_add(from_epoch.subsec_nanos() as u64 / 1_000_000); + (addr, secs) + }) + .collect::>(), + )?; + s.serialize_field("public_key", &self.public_key)?; + s.end() + } +} + +impl<'de> Deserialize<'de> for PeerInfo { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> + { + // We deserialize to an intermdiate struct first, then turn that struct into a `PeerInfo`. + let interm = { + #[derive(Deserialize)] + struct Interm { + addrs: Vec<(String, u64)>, + public_key: Option>, + } + Interm::deserialize(deserializer)? + }; + + let addrs = { + let mut out = Vec::with_capacity(interm.addrs.len()); + for (addr, since_epoch) in interm.addrs { + let addr = match Multiaddr::new(&addr) { + Ok(a) => a, + Err(err) => return Err(DeserializerError::custom(err)), + }; + let expires = UNIX_EPOCH + Duration::from_millis(since_epoch); + out.push((addr, expires)); + } + out + }; + + Ok(PeerInfo { + addrs: addrs, + public_key: interm.public_key, + }) + } +} + +impl PartialOrd for PeerInfo { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + // See module-level comment. + match (&self.public_key, &other.public_key) { + (&Some(ref my_pub), &Some(ref other_pub)) => { + Some(my_pub.cmp(other_pub)) + } + _ => { + None + } } } - pub fn get_public_key(&self) -> &[u8] { - &self.public_key - } - pub fn set_public_key(&mut self, key: Vec) { - self.public_key = key; - } - pub fn get_addrs(&self) -> &[Multiaddr] { - &self.addrs - } - pub fn set_addrs(&mut self, addrs: Vec) { - self.addrs = addrs; - } - pub fn add_addr(&mut self, addr: Multiaddr) { - self.addrs.push(addr); // TODO: This is stupid, a more advanced thing using TTLs need to be implemented - self.addrs.dedup(); - } - pub fn get_data(&self, key: &str) -> Option<&T> { - self.data.get(key) - } - pub fn set_data(&mut self, key: String, val: T) -> Option { - self.data.insert(key, val) - } } diff --git a/libp2p-peerstore/src/peerstore.rs b/libp2p-peerstore/src/peerstore.rs index d85de7d9..0e08ecb0 100644 --- a/libp2p-peerstore/src/peerstore.rs +++ b/libp2p-peerstore/src/peerstore.rs @@ -1,48 +1,104 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use {PeerId, TTL}; use multiaddr::Multiaddr; -use peer::PeerId; -use peer_info::PeerInfo; -use super::TTL; -pub trait Peerstore { - /// Add a peer to this peer store - fn add_peer(&mut self, peer_id: PeerId, peer_info: PeerInfo); +/// Implemented on objects that store peers. +/// +/// Note that the methods of this trait take by ownership (ie. `self` instead of `&self` or +/// `&mut self`). This was made so that the associated types could hold `self` internally and +/// because Rust doesn't have higher-ranked trait bounds yet. +/// +/// Therefore this trait should likely be implemented on `&'a ConcretePeerstore` instead of +/// on `ConcretePeerstore`. +pub trait Peerstore { + /// Grants access to the a peer inside the peer store. + type PeerAccess: PeerAccess; + /// List of the peers in this peer store. + type PeersIter: Iterator; - /// Returns a list of peers in this Peerstore - fn peers(&self) -> Vec<&PeerId>; + /// Grants access to a peer by its ID. + fn peer(self, peer_id: &PeerId) -> Option; - /// Returns the PeerInfo for a specific peer in this peer store, or None if it doesn't exist. - fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo>; + /// Grants access to a peer by its ID or creates it. + fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess; - /// Try to get a property for a given peer - fn get_data(&self, peer_id: &PeerId, key: &str) -> Option<&T>; - - /// Set a property for a given peer - fn put_data(&mut self, peer_id: &PeerId, key: String, val: T); - - /// Adds an address to a peer - fn add_addr(&mut self, peer_id: &PeerId, addr: Multiaddr, ttl: TTL); - - // AddAddrs gives AddrManager addresses to use, with a given ttl - // (time-to-live), after which the address is no longer valid. - // If the manager has a longer TTL, the operation is a no-op for that address - fn add_addrs(&mut self, peer_id: &PeerId, addrs: Vec, ttl: TTL); - - // SetAddr calls mgr.SetAddrs(p, addr, ttl) - fn set_addr(&mut self, peer_id: &PeerId, addr: Multiaddr, ttl: TTL); - - // SetAddrs sets the ttl on addresses. This clears any TTL there previously. - // This is used when we receive the best estimate of the validity of an address. - fn set_addrs(&mut self, peer_id: &PeerId, addrs: Vec, ttl: TTL); - - /// Returns all known (and valid) addresses for a given peer - fn addrs(&self, peer_id: &PeerId) -> &[Multiaddr]; - - /// Removes all previously stored addresses - fn clear_addrs(&mut self, peer_id: &PeerId); - - /// Get public key for a peer - fn get_pub_key(&self, peer_id: &PeerId) -> Option<&[u8]>; - - /// Set public key for a peer - fn set_pub_key(&mut self, peer_id: &PeerId, key: Vec); + /// Returns a list of peers in this peer store. + /// + /// Keep in mind that the trait implementation may allow new peers to be added or removed at + /// any time. If that is the case, you have to take into account that this is only an + /// indication. + fn peers(self) -> Self::PeersIter; +} + +/// Implemented on objects that represent an open access to a peer stored in a peer store. +/// +/// The exact semantics of "open access" depend on the trait implementation. +pub trait PeerAccess { + /// Iterator returned by `addrs`. + type AddrsIter: Iterator; + + /// Returns all known and non-expired addresses for a given peer. + /// + /// > **Note**: Keep in mind that this function is racy because addresses can expire between + /// > the moment when you get them and the moment when you process them. + fn addrs(&self) -> Self::AddrsIter; + + /// Adds an address to a peer. + /// + /// If the manager already has this address stored and with a longer TTL, then the operation + /// is a no-op. + fn add_addr(&mut self, addr: Multiaddr, ttl: TTL); + + // Similar to calling `add_addr` multiple times in a row. + #[inline] + fn add_addrs(&mut self, addrs: I, ttl: TTL) + where I: IntoIterator + { + for addr in addrs.into_iter() { + self.add_addr(addr, ttl); + } + } + + /// Sets the TTL of an address of a peer. Adds the address if it is currently unknown. + /// + /// Contrary to `add_addr`, this operation is never a no-op. + #[inline] + fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL); + + // Similar to calling `set_addr_ttl` multiple times in a row. + fn set_addrs_ttl(&mut self, addrs: I, ttl: TTL) + where I: IntoIterator + { + for addr in addrs.into_iter() { + self.add_addr(addr, ttl); + } + } + + /// Removes all previously stored addresses. + fn clear_addrs(&mut self); + + /// Get the public key for the peer, if known. + fn get_pub_key(&self) -> Option<&[u8]>; + + /// Set public key for the peer. + fn set_pub_key(&mut self, key: Vec); } diff --git a/libp2p-peerstore/src/peerstore_tests.rs b/libp2p-peerstore/src/peerstore_tests.rs new file mode 100644 index 00000000..7833aa6a --- /dev/null +++ b/libp2p-peerstore/src/peerstore_tests.rs @@ -0,0 +1,139 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Contains the `peerstore_tests!` macro which will test an implementation of `Peerstore`. +//! You need to pass as first parameter a way to create the `Peerstore` that you want to test. +//! +//! You can also pass as additional parameters a list of statements that will be inserted before +//! each test. This allows you to have the peerstore builder use variables created by these +//! statements. + +#![cfg(test)] + +macro_rules! peerstore_tests { + ({$create_peerstore:expr} $({$stmt:stmt})*) => { + use std::thread; + use std::time::Duration; + use {Peerstore, PeerAccess}; + use multiaddr::Multiaddr; + use multihash::Multihash; + + #[test] + fn initially_empty() { + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + assert_eq!(peer_store.peers().count(), 0); + assert!(peer_store.peer(&peer_id).is_none()); + } + + #[test] + fn set_pub_key_then_retreive() { + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + + peer_store.peer_or_create(&peer_id).set_pub_key(vec![9, 8, 7]); + + assert_eq!(peer_store.peer(&peer_id).unwrap().get_pub_key().unwrap(), &[9, 8, 7]); + + assert_eq!(peer_store.peers().collect::>(), &[peer_id.clone()]); + } + + #[test] + fn set_then_get_addr() { + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + + peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); + + let addrs = peer_store.peer(&peer_id).unwrap().addrs().collect::>(); + assert_eq!(addrs, &[addr]); + } + + #[test] + fn add_expired_addr() { + // Add an already-expired address to a peer. + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + + peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(0)); + thread::sleep(Duration::from_millis(2)); + + let addrs = peer_store.peer(&peer_id).unwrap().addrs(); + assert_eq!(addrs.count(), 0); + } + + #[test] + fn clear_addrs() { + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + + peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); + peer_store.peer(&peer_id).unwrap().clear_addrs(); + + let addrs = peer_store.peer(&peer_id).unwrap().addrs(); + assert_eq!(addrs.count(), 0); + } + + #[test] + fn no_update_ttl() { + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + + let addr1 = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + let addr2 = Multiaddr::new("/ip4/0.0.0.1/tcp/0").unwrap(); + + peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000)); + peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000)); + assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2); + + // `add_addr` must not overwrite the TTL because it's already higher + peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(0)); + thread::sleep(Duration::from_millis(2)); + assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2); + } + + #[test] + fn force_update_ttl() { + $($stmt;)* + let peer_store = $create_peerstore; + let peer_id = Multihash::encode_bytes(0, vec![1, 2, 3]).unwrap(); + + let addr1 = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + let addr2 = Multiaddr::new("/ip4/0.0.0.1/tcp/0").unwrap(); + + peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000)); + peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000)); + assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2); + + peer_store.peer_or_create(&peer_id).set_addr_ttl(addr1.clone(), Duration::from_millis(0)); + thread::sleep(Duration::from_millis(2)); + assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 1); + } + }; +} diff --git a/multihash/src/lib.rs b/multihash/src/lib.rs index 7353522c..546c8cd8 100644 --- a/multihash/src/lib.rs +++ b/multihash/src/lib.rs @@ -1,3 +1,23 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + extern crate hex; extern crate byteorder; @@ -77,7 +97,7 @@ pub enum Error { /// (for instance) a network stream. The format is as follows: /// /// See the spec for more information. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] pub struct Multihash { /// The code of the hash algorithm in question code: u64,