From 9e0f110e47b49c57cc4eb471a53cae06b73e53aa Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 4 Dec 2018 14:52:14 +0100 Subject: [PATCH] Remove relay, peerstore and datastore (#723) --- Cargo.toml | 5 - protocols/identify/Cargo.toml | 1 - protocols/identify/src/lib.rs | 1 - protocols/kad/Cargo.toml | 1 - protocols/kad/src/lib.rs | 1 - src/lib.rs | 2 - stores/datastore/Cargo.toml | 13 - stores/datastore/src/json_file.rs | 336 ---------- stores/datastore/src/lib.rs | 177 ----- stores/datastore/src/query.rs | 350 ---------- stores/peerstore/Cargo.toml | 20 - stores/peerstore/src/json_peerstore.rs | 171 ----- stores/peerstore/src/lib.rs | 94 --- stores/peerstore/src/memory_peerstore.rs | 123 ---- stores/peerstore/src/peer_info.rs | 139 ---- stores/peerstore/src/peerstore.rs | 116 ---- stores/peerstore/src/peerstore_tests.rs | 132 ---- transports/relay/Cargo.toml | 19 - transports/relay/src/copy.rs | 139 ---- transports/relay/src/error.rs | 76 --- transports/relay/src/lib.rs | 42 -- transports/relay/src/message.proto | 42 -- transports/relay/src/message.rs | 803 ----------------------- transports/relay/src/protocol.rs | 322 --------- transports/relay/src/transport.rs | 222 ------- transports/relay/src/utility.rs | 183 ------ 26 files changed, 3530 deletions(-) delete mode 100644 stores/datastore/Cargo.toml delete mode 100644 stores/datastore/src/json_file.rs delete mode 100644 stores/datastore/src/lib.rs delete mode 100644 stores/datastore/src/query.rs delete mode 100644 stores/peerstore/Cargo.toml delete mode 100644 stores/peerstore/src/json_peerstore.rs delete mode 100644 stores/peerstore/src/lib.rs delete mode 100644 stores/peerstore/src/memory_peerstore.rs delete mode 100644 stores/peerstore/src/peer_info.rs delete mode 100644 stores/peerstore/src/peerstore.rs delete mode 100644 stores/peerstore/src/peerstore_tests.rs delete mode 100644 transports/relay/Cargo.toml delete mode 100644 transports/relay/src/copy.rs delete mode 100644 transports/relay/src/error.rs delete mode 100644 transports/relay/src/lib.rs delete mode 100644 transports/relay/src/message.proto delete mode 100644 transports/relay/src/message.rs delete mode 100644 transports/relay/src/protocol.rs delete mode 100644 transports/relay/src/transport.rs delete mode 100644 transports/relay/src/utility.rs diff --git a/Cargo.toml b/Cargo.toml index d68027ce..3a875541 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,11 +18,9 @@ libp2p-mplex = { path = "./muxers/mplex" } libp2p-identify = { path = "./protocols/identify" } libp2p-kad = { path = "./protocols/kad" } libp2p-floodsub = { path = "./protocols/floodsub" } -libp2p-peerstore = { path = "./stores/peerstore" } libp2p-ping = { path = "./protocols/ping" } libp2p-plaintext = { path = "./protocols/plaintext" } libp2p-ratelimit = { path = "./transports/ratelimit" } -libp2p-relay = { path = "./transports/relay" } libp2p-core = { path = "./core" } libp2p-core-derive = { path = "./misc/core-derive" } libp2p-secio = { path = "./protocols/secio", default-features = false } @@ -67,11 +65,8 @@ members = [ "protocols/ping", "protocols/plaintext", "protocols/secio", - "stores/datastore", - "stores/peerstore", "transports/dns", "transports/ratelimit", - "transports/relay", "transports/tcp", "transports/timeout", "transports/uds", diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 822c3ee6..a7692c68 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -8,7 +8,6 @@ license = "MIT" bytes = "0.4" fnv = "1" futures = "0.1" -libp2p-peerstore = { path = "../../stores/peerstore" } libp2p-core = { path = "../../core" } log = "0.4.1" multiaddr = { path = "../../misc/multiaddr" } diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index 52e898bc..ee50ddc0 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -69,7 +69,6 @@ extern crate bytes; extern crate fnv; #[macro_use] extern crate futures; -extern crate libp2p_peerstore; extern crate libp2p_core; extern crate log; extern crate multiaddr; diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 0b2448bc..0758bb73 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -9,7 +9,6 @@ arrayvec = "0.4.7" bs58 = "0.2.0" bigint = "4.2" bytes = "0.4" -datastore = { path = "../../stores/datastore" } fnv = "1.0" futures = "0.1" libp2p-identify = { path = "../../protocols/identify" } diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 3af65768..0ab72964 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -60,7 +60,6 @@ extern crate arrayvec; extern crate bigint; extern crate bs58; extern crate bytes; -extern crate datastore; extern crate fnv; #[cfg_attr(test, macro_use)] extern crate futures; diff --git a/src/lib.rs b/src/lib.rs index 3aebc9f2..8547d1c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,11 +148,9 @@ pub extern crate libp2p_floodsub as floodsub; pub extern crate libp2p_mplex as mplex; #[cfg(not(target_os = "emscripten"))] pub extern crate libp2p_mdns as mdns; -pub extern crate libp2p_peerstore as peerstore; pub extern crate libp2p_ping as ping; pub extern crate libp2p_plaintext as plaintext; pub extern crate libp2p_ratelimit as ratelimit; -pub extern crate libp2p_relay as relay; pub extern crate libp2p_secio as secio; #[cfg(not(target_os = "emscripten"))] pub extern crate libp2p_tcp_transport as tcp; diff --git a/stores/datastore/Cargo.toml b/stores/datastore/Cargo.toml deleted file mode 100644 index 3ca0ad2b..00000000 --- a/stores/datastore/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "datastore" -version = "0.1.0" -authors = ["Parity Technologies "] -license = "MIT" - -[dependencies] -base64 = "0.7" -chashmap = "2.2" -futures = "0.1" -serde = "1.0" -serde_json = "1.0" -tempfile = "3" diff --git a/stores/datastore/src/json_file.rs b/stores/datastore/src/json_file.rs deleted file mode 100644 index 2c700fa9..00000000 --- a/stores/datastore/src/json_file.rs +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Implementation of `Datastore` that uses a single plain JSON file for storage. - -use Datastore; -use chashmap::{CHashMap, WriteGuard}; -use futures::Future; -use futures::stream::{iter_ok, Stream}; -use query::{naive_apply_query, Query}; -use serde::Serialize; -use serde::de::DeserializeOwned; -use serde_json::value::Value; -use serde_json::{from_reader, from_value, to_value, to_writer, Map}; -use std::borrow::Cow; -use std::fs; -use std::io::Cursor; -use std::io::Error as IoError; -use std::io::ErrorKind as IoErrorKind; -use std::io::Read; -use std::ops::{Deref, DerefMut}; -use std::path::PathBuf; -use tempfile::NamedTempFile; - -/// Implementation of `Datastore` that uses a single plain JSON file. -pub struct JsonFileDatastore -where - T: Serialize + DeserializeOwned + Clone, -{ - path: PathBuf, - content: CHashMap, -} - -impl JsonFileDatastore -where - T: Serialize + DeserializeOwned + Clone, -{ - /// Opens or creates the datastore. If the path refers to an existing path, then this function - /// will attempt to load an existing set of values from it (which can result in an error). - /// Otherwise if the path doesn't exist, the parent directory will be created and a new empty - /// datastore will be returned. - pub fn new

(path: P) -> Result, IoError> - where - P: Into, - { - let path = path.into(); - - if !path.exists() { - fs::create_dir_all(path.parent() - .expect("can only fail if root, which means that path.exists() would be true"))?; - return Ok(JsonFileDatastore { - path: path, - content: CHashMap::new(), - }); - } - - let content = { - let mut file = fs::File::open(&path)?; - - // We want to support empty files (and treat them as an empty recordset). Unfortunately - // `serde_json` will always produce an error if we do this ("unexpected EOF at line 0 - // column 0"). Therefore we start by reading one byte from the file in order to check - // for EOF. - - let mut first_byte = [0]; - if file.read(&mut first_byte)? == 0 { - // File is empty. - CHashMap::new() - } else { - match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { - Ok(Value::Null) => CHashMap::new(), - Ok(Value::Object(map)) => { - let mut out = CHashMap::with_capacity(map.len()); - for (key, value) in map.into_iter() { - let value = match from_value(value) { - Ok(v) => v, - Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), - }; - out.insert(key, value); - } - out - } - Ok(_) => { - return Err(IoError::new( - IoErrorKind::InvalidData, - "expected JSON object", - )); - } - Err(err) => { - return Err(IoError::new(IoErrorKind::InvalidData, err)); - } - } - } - }; - - Ok(JsonFileDatastore { - path: path, - content: content, - }) - } - - /// Flushes the content of the datastore to the disk. - /// - /// This function can only fail in case of a disk access error. If an error occurs, any change - /// to the datastore that was performed since the last successful flush will be lost. No data - /// will be corrupted. - pub fn flush(&self) -> Result<(), IoError> - where - T: Clone, - { - // Create a temporary file in the same directory as the destination, which avoids the - // problem of having a file cleaner delete our file while we use it. - let self_path_parent = self.path.parent().ok_or(IoError::new( - IoErrorKind::Other, - "couldn't get parent directory of destination", - ))?; - let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; - - let content = self.content.clone().into_iter(); - to_writer( - &mut temporary_file, - &content - .map(|(k, v)| (k, to_value(v).unwrap())) - .collect::>(), - )?; - temporary_file.as_file().sync_data()?; - - // Note that `persist` will fail if we try to persist across filesystems. However that - // shouldn't happen since we created the temporary file in the same directory as the final - // path. - temporary_file.persist(&self.path)?; - Ok(()) - } -} - -impl<'a, T> Datastore for &'a JsonFileDatastore -where - T: Clone + Serialize + DeserializeOwned + Default + PartialOrd + 'static, -{ - type Entry = JsonFileDatastoreEntry<'a, T>; - type QueryResult = Box + 'a>; - - #[inline] - fn lock(self, key: Cow) -> Option { - self.content - .get_mut(&key.into_owned()) - .map(JsonFileDatastoreEntry) - } - - #[inline] - fn lock_or_create(self, key: Cow) -> Self::Entry { - loop { - self.content - .upsert(key.clone().into_owned(), || Default::default(), |_| {}); - - // There is a slight possibility that another thread will delete our value in this - // small interval. If this happens, we just loop and reinsert the value again until - // we can acquire a lock. - if let Some(v) = self.content.get_mut(&key.clone().into_owned()) { - return JsonFileDatastoreEntry(v); - } - } - } - - #[inline] - fn put(self, key: Cow, value: T) { - self.content.insert(key.into_owned(), value); - } - - #[inline] - fn get(self, key: &str) -> Option { - self.content.get(&key.to_owned()).map(|v| v.clone()) - } - - #[inline] - fn has(self, key: &str) -> bool { - self.content.contains_key(&key.to_owned()) - } - - #[inline] - fn delete(self, key: &str) -> Option { - self.content.remove(&key.to_owned()) - } - - fn query(self, query: Query) -> Self::QueryResult { - let content = self.content.clone(); - - let keys_only = query.keys_only; - - let content_stream = iter_ok(content.into_iter().filter_map(|(key, value)| { - // Skip values that are malformed. - let value = if keys_only { Default::default() } else { value }; - Some((key, value)) - })); - - // `content_stream` reads from the content of the `Mutex`, so we need to clone the data - // into a `Vec` before returning. - let collected = naive_apply_query(content_stream, query) - .collect() - .wait() - .expect( - "can only fail if either `naive_apply_query` or `content_stream` produce \ - an error, which cann't happen", - ); - let output_stream = iter_ok(collected.into_iter()); - Box::new(output_stream) as Box<_> - } -} - -impl Drop for JsonFileDatastore -where - T: Serialize + DeserializeOwned + Clone, -{ - #[inline] - fn drop(&mut self) { - // Unfortunately there's not much we can do here in case of an error, as panicking would be - // very bad. Similar to `File`, the user should take care to call `flush()` before dropping - // the datastore. - // - // If an error happens here, any change since the last successful flush will be lost, but - // the data will not be corrupted. - let _ = self.flush(); - } -} - -/// Implementation of `Datastore` that uses a single plain JSON file. -pub struct JsonFileDatastoreEntry<'a, T>(WriteGuard<'a, String, T>) -where - T: 'a; - -impl<'a, T> Deref for JsonFileDatastoreEntry<'a, T> -where - T: 'a, -{ - type Target = T; - - fn deref(&self) -> &T { - &*self.0 - } -} - -impl<'a, T> DerefMut for JsonFileDatastoreEntry<'a, T> -where - T: 'a, -{ - fn deref_mut(&mut self) -> &mut T { - &mut *self.0 - } -} - -#[cfg(test)] -mod tests { - use Datastore; - use JsonFileDatastore; - use futures::{Future, Stream}; - use tempfile::NamedTempFile; - use {Filter, FilterOp, FilterTy, Order, Query}; - - #[test] - fn open_and_flush() { - let path = NamedTempFile::new().unwrap().into_temp_path(); - - let datastore = JsonFileDatastore::>::new(&path).unwrap(); - datastore.flush().unwrap(); - } - - #[test] - fn values_store_and_reload() { - let path = NamedTempFile::new().unwrap().into_temp_path(); - - let datastore = JsonFileDatastore::>::new(&path).unwrap(); - datastore.put("foo".into(), vec![1, 2, 3]); - datastore.put("bar".into(), vec![0, 255, 127]); - datastore.flush().unwrap(); - drop(datastore); - - - let reload = JsonFileDatastore::>::new(&path).unwrap(); - assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); - assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); - } - - #[test] - fn query_basic() { - let path = NamedTempFile::new().unwrap().into_temp_path(); - - let datastore = JsonFileDatastore::>::new(&path).unwrap(); - datastore.put("foo1".into(), vec![6, 7, 8]); - datastore.put("foo2".into(), vec![6, 7, 8]); - datastore.put("foo3".into(), vec![7, 8, 9]); - datastore.put("foo4".into(), vec![10, 11, 12]); - datastore.put("foo5".into(), vec![13, 14, 15]); - datastore.put("bar1".into(), vec![0, 255, 127]); - datastore.flush().unwrap(); - - let query = datastore - .query(Query { - prefix: "fo".into(), - filters: vec![ - Filter { - ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()), - operation: FilterOp::NotEqual, - }, - ], - orders: vec![Order::ByKeyDesc], - skip: 1, - limit: u64::max_value(), - keys_only: false, - }) - .collect() - .wait() - .unwrap(); - - assert_eq!(query[0].0, "foo4"); - assert_eq!(query[0].1, &[10, 11, 12]); - assert_eq!(query[1].0, "foo3"); - assert_eq!(query[1].1, &[7, 8, 9]); - } -} diff --git a/stores/datastore/src/lib.rs b/stores/datastore/src/lib.rs deleted file mode 100644 index 341aea2b..00000000 --- a/stores/datastore/src/lib.rs +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! General-purpose key-value storage. -//! The keys are strings, and the values are of any type you want. -//! -//! > **Note**: This crate is meant to be a utility for the implementation of other crates; it -//! > does not directly participate in the stack of libp2p. -//! -//! This crate provides the `Datastore` trait, whose template parameter is the type of the value. -//! It is implemented on types that represent a key-value storage. -//! The only available implementation for now is `JsonFileDatastore`. -//! -//! # JSON file datastore -//! -//! The `JsonFileDatastore` can provide a key-value storage that loads and stores data in a single -//! JSON file. It is only available if the value implements the `Serialize`, `DeserializeOwned` -//! and `Clone` traits. -//! -//! The `JsonFileDatastore::new` method will attempt to load existing data from the path you pass -//! as parameter. This path is also where the data will be stored. The content of the store is -//! flushed on drop or if you call `flush()`. -//! -//! ```no_run -//! use datastore::Datastore; -//! use datastore::JsonFileDatastore; -//! -//! let datastore = JsonFileDatastore::>::new("/tmp/test.json").unwrap(); -//! datastore.put("foo".into(), vec![1, 2, 3]); -//! datastore.put("bar".into(), vec![0, 255, 127]); -//! assert_eq!(datastore.get("foo").unwrap(), &[1, 2, 3]); -//! datastore.flush().unwrap(); // optional -//! ``` -//! -//! # Query -//! -//! In addition to simple operations such as `get` or `put`, the `Datastore` trait also provides -//! a way to perform queries on the key-value storage, using the `query` method. -//! -//! The struct returned by the `query` method implements the `Stream` trait from `futures`, -//! meaning that the result is asynchronous. -//! -//! > **Note**: For now the API of the `get` and `has` methods makes them potentially blocking -//! > operations, though the only available implementation doesn't block. The API of these -//! > methods may become asynchronous in the future if deemed necessary. -//! -//! ```no_run -//! extern crate datastore; -//! extern crate futures; -//! -//! # fn main() { -//! use datastore::{Query, Order, Filter, FilterTy, FilterOp}; -//! use datastore::Datastore; -//! use datastore::JsonFileDatastore; -//! use futures::{Future, Stream}; -//! -//! let datastore = JsonFileDatastore::>::new("/tmp/test.json").unwrap(); -//! let query = datastore.query(Query { -//! // Only return the keys that start with this prefix. -//! prefix: "fo".into(), -//! // List of filters for the keys and/or values. -//! filters: vec![ -//! Filter { -//! ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()), -//! operation: FilterOp::NotEqual, -//! }, -//! ], -//! // Order in which to sort the results. -//! orders: vec![Order::ByKeyDesc], -//! // Number of entries to skip at the beginning of the results (after sorting). -//! skip: 1, -//! // Limit to the number of entries to return (use `u64::max_value()` for no limit). -//! limit: 12, -//! // If true, don't load the values. For optimization purposes. -//! keys_only: false, -//! }); -//! -//! let results = query.collect().wait().unwrap(); -//! println!("{:?}", results); -//! # } -//! ``` - -extern crate base64; -extern crate chashmap; -#[macro_use] -extern crate futures; -extern crate serde; -extern crate serde_json; -extern crate tempfile; - -use futures::Stream; -use std::borrow::Cow; -use std::io::Error as IoError; -use std::ops::DerefMut; - -mod json_file; -mod query; - -pub use self::json_file::{JsonFileDatastore, JsonFileDatastoreEntry}; -pub use self::query::{Filter, FilterOp, FilterTy, Order, Query}; - -/// Abstraction over any struct that can store `(key, value)` pairs. -pub trait Datastore { - /// Locked entry. - type Entry: DerefMut; - /// Output of a query. - type QueryResult: Stream; - - /// Sets the value of a key. - #[inline] - fn put(self, key: Cow, value: T) - where - Self: Sized, - { - *self.lock_or_create(key) = value; - } - - /// Checks if an entry exists, and if so locks it. - /// - /// Trying to lock a value that is already locked will block, therefore you should keep locks - /// for a duration that is as short as possible. - fn lock(self, key: Cow) -> Option; - - /// Locks an entry if it exists, or creates it otherwise. - /// - /// Same as `put` followed with `lock`, except that it is atomic. - fn lock_or_create(self, key: Cow) -> Self::Entry; - - /// Returns the value corresponding to this key by cloning it. - #[inline] - fn get(self, key: &str) -> Option - where - Self: Sized, - T: Clone, - { - self.lock(key.into()).map(|v| v.clone()) - } - - /// Returns true if the datastore contains the given key. - /// - /// > **Note**: Keep in mind that using this operation is probably racy. A secondary thread - /// > can delete a key right after you called `has()`. In other words, this function - /// > returns whether an entry with that key existed in the short past. - #[inline] - fn has(self, key: &str) -> bool - where - Self: Sized, - { - self.lock(key.into()).is_some() - } - - /// Removes the given key from the datastore. Returns the old value if the key existed. - fn delete(self, key: &str) -> Option; - - /// Executes a query on the key-value store. - /// - /// This operation is expensive on some implementations and cheap on others. It is your - /// responsibility to pick the right implementation for the right job. - fn query(self, query: Query) -> Self::QueryResult; -} diff --git a/stores/datastore/src/query.rs b/stores/datastore/src/query.rs deleted file mode 100644 index f7062279..00000000 --- a/stores/datastore/src/query.rs +++ /dev/null @@ -1,350 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::stream::{iter_ok, Skip as StreamSkip, Take as StreamTake}; -use futures::{Async, Future, Poll, Stream}; -use std::borrow::Cow; -use std::cmp::Ordering; -use std::io::Error as IoError; -use std::marker::PhantomData; -use std::vec::IntoIter as VecIntoIter; - -/// Description of a query to apply on a datastore. -/// -/// The various modifications of the dataset are applied in the same order as the fields (prefix, -/// filters, orders, skip, limit). -#[derive(Debug, Clone)] -pub struct Query<'a, T: 'a> { - /// Only the keys that start with `prefix` will be returned. - pub prefix: Cow<'a, str>, - /// Filters to apply on the results. - pub filters: Vec>, - /// How to order the keys. Applied sequentially. - pub orders: Vec, - /// Number of elements to skip from at the start of the results. - pub skip: u64, - /// Maximum number of elements in the results. - pub limit: u64, - /// Only return keys. If true, then all the `Vec`s of the data will be empty. - pub keys_only: bool, -} - -/// A filter to apply to the results set. -#[derive(Debug, Clone)] -pub struct Filter<'a, T: 'a> { - /// Type of filter and value to compare with. - pub ty: FilterTy<'a, T>, - /// Comparison operation. - pub operation: FilterOp, -} - -/// Type of filter and value to compare with. -#[derive(Debug, Clone)] -pub enum FilterTy<'a, T: 'a> { - /// Compare the key with a reference value. - KeyCompare(Cow<'a, str>), - /// Compare the value with a reference value. - ValueCompare(&'a T), -} - -/// Filtering operation. -#[derive(Debug, Copy, Clone)] -pub enum FilterOp { - Equal, - NotEqual, - Less, - LessOrEqual, - Greater, - GreaterOrEqual, -} - -/// Order in which to sort the results of a query. -#[derive(Debug, Copy, Clone)] -pub enum Order { - /// Put the values in ascending order. - ByValueAsc, - /// Put the values in descending order. - ByValueDesc, - /// Put the keys in ascending order. - ByKeyAsc, - /// Put the keys in descending order. - ByKeyDesc, -} - -/// Naively applies a query on a set of results. -pub fn naive_apply_query<'a, S, V>( - stream: S, - query: Query<'a, V>, -) -> StreamTake< - StreamSkip< - NaiveKeysOnlyApply< - NaiveApplyOrdered< - NaiveFiltersApply<'a, NaivePrefixApply<'a, S>, VecIntoIter>>, - V, - >, - >, - >, -> -where - S: Stream + 'a, - V: Clone + PartialOrd + Default + 'static, -{ - let prefixed = naive_apply_prefix(stream, query.prefix); - let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); - let ordered = naive_apply_ordered(filtered, query.orders); - let keys_only = naive_apply_keys_only(ordered, query.keys_only); - naive_apply_skip_limit(keys_only, query.skip, query.limit) -} - -/// Skips the `skip` first element of a stream and only returns `limit` elements. -#[inline] -pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> -where - S: Stream, -{ - stream.skip(skip).take(limit) -} - -/// Filters the result of a stream to empty values if `keys_only` is true. -#[inline] -pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply -where - S: Stream, -{ - NaiveKeysOnlyApply { - keys_only: keys_only, - stream: stream, - } -} - -/// Returned by `naive_apply_keys_only`. -#[derive(Debug, Clone)] -pub struct NaiveKeysOnlyApply { - keys_only: bool, - stream: S, -} - -impl Stream for NaiveKeysOnlyApply -where - S: Stream, - T: Default, -{ - type Item = (String, T); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - if self.keys_only { - Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { - v.1 = Default::default(); - v - }))) - } else { - self.stream.poll() - } - } -} - -/// Filters the result of a stream to only keep the results with a prefix. -#[inline] -pub fn naive_apply_prefix<'a, S, T>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> -where - S: Stream, -{ - NaivePrefixApply { - prefix: prefix, - stream: stream, - } -} - -/// Returned by `naive_apply_prefix`. -#[derive(Debug, Clone)] -pub struct NaivePrefixApply<'a, S> { - prefix: Cow<'a, str>, - stream: S, -} - -impl<'a, S, T> Stream for NaivePrefixApply<'a, S> -where - S: Stream, -{ - type Item = (String, T); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - loop { - let item = try_ready!(self.stream.poll()); - match item { - Some(i) => { - if i.0.starts_with(&*self.prefix) { - return Ok(Async::Ready(Some(i))); - } - } - None => return Ok(Async::Ready(None)), - } - } - } -} - -/// Applies orderings on the stream data. Will simply pass data through if the list of orderings -/// is empty. Otherwise will need to collect. -pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V> -where - S: Stream + 'a, - I: IntoIterator, - I::IntoIter: 'a, - V: PartialOrd + 'static, -{ - let orders_iter = orders_iter.into_iter(); - if orders_iter.size_hint().1 == Some(0) { - return NaiveApplyOrdered { - inner: NaiveApplyOrderedInner::PassThrough(stream), - }; - } - - let collected = stream - .collect() - .and_then(move |mut collected| { - for order in orders_iter { - match order { - Order::ByValueAsc => { - collected.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal)); - } - Order::ByValueDesc => { - collected.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal)); - } - Order::ByKeyAsc => { - collected.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal)); - } - Order::ByKeyDesc => { - collected.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal)); - } - } - } - Ok(iter_ok(collected.into_iter())) - }) - .flatten_stream(); - - NaiveApplyOrdered { - inner: NaiveApplyOrderedInner::Collected(Box::new(collected)), - } -} - -/// Returned by `naive_apply_ordered`. -pub struct NaiveApplyOrdered<'a, S, T> { - inner: NaiveApplyOrderedInner<'a, S, T>, -} - -enum NaiveApplyOrderedInner<'a, S, T> { - PassThrough(S), - Collected(Box + 'a>), -} - -impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V> -where - S: Stream, -{ - type Item = (String, V); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - match self.inner { - NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(), - NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(), - } - } -} - -/// Filters the result of a stream to apply a set of filters. -#[inline] -pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> -where - S: Stream, - I: Iterator> + Clone, - V: 'a, -{ - NaiveFiltersApply { - filters: filters, - stream: stream, - marker: PhantomData, - } -} - -/// Returned by `naive_apply_prefix`. -#[derive(Debug, Clone)] -pub struct NaiveFiltersApply<'a, S, I> { - filters: I, - stream: S, - marker: PhantomData<&'a ()>, -} - -impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I> -where - S: Stream, - I: Iterator> + Clone, - T: PartialOrd + 'a, -{ - type Item = (String, T); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - 'outer: loop { - let item = try_ready!(self.stream.poll()); - match item { - Some(i) => { - for filter in self.filters.clone() { - if !naive_filter_test(&i, &filter) { - continue 'outer; - } - } - return Ok(Async::Ready(Some(i))); - } - None => return Ok(Async::Ready(None)), - } - } - } -} - -#[inline] -fn naive_filter_test(entry: &(String, T), filter: &Filter) -> bool -where - T: PartialOrd, -{ - let (expected_ordering, revert_expected) = match filter.operation { - FilterOp::Equal => (Ordering::Equal, false), - FilterOp::NotEqual => (Ordering::Equal, true), - FilterOp::Less => (Ordering::Less, false), - FilterOp::GreaterOrEqual => (Ordering::Less, true), - FilterOp::Greater => (Ordering::Greater, false), - FilterOp::LessOrEqual => (Ordering::Greater, true), - }; - - match filter.ty { - FilterTy::KeyCompare(ref ref_value) => { - ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected - } - FilterTy::ValueCompare(ref ref_value) => { - (entry.1.partial_cmp(&**ref_value) == Some(expected_ordering)) != revert_expected - } - } -} diff --git a/stores/peerstore/Cargo.toml b/stores/peerstore/Cargo.toml deleted file mode 100644 index d75dba46..00000000 --- a/stores/peerstore/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "libp2p-peerstore" -version = "0.1.0" -authors = ["Parity Technologies "] -license = "MIT" - -[dependencies] -bs58 = "0.2.0" -datastore = { path = "../../stores/datastore" } -futures = "0.1.0" -owning_ref = "0.3.3" -libp2p-core = { path = "../../core" } -multiaddr = { path = "../../misc/multiaddr" } -serde = "1.0.70" -serde_derive = "1.0.70" - -[dev-dependencies] -tempfile = "2.2" -serde_json = "1.0" -multihash = { path = "../../misc/multihash" } diff --git a/stores/peerstore/src/json_peerstore.rs b/stores/peerstore/src/json_peerstore.rs deleted file mode 100644 index e25b3f38..00000000 --- a/stores/peerstore/src/json_peerstore.rs +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Implementation of the `Peerstore` trait that uses a single JSON file as backend. - -use super::TTL; -use bs58; -use datastore::{Datastore, JsonFileDatastore, JsonFileDatastoreEntry, Query}; -use futures::{Future, Stream}; -use multiaddr::Multiaddr; -use peer_info::{AddAddrBehaviour, PeerInfo}; -use peerstore::{PeerAccess, Peerstore}; -use std::io::Error as IoError; -use std::iter; -use std::path::PathBuf; -use std::vec::IntoIter as VecIntoIter; -use PeerId; - -/// Peerstore backend that uses a Json file. -pub struct JsonPeerstore { - store: JsonFileDatastore, -} - -impl JsonPeerstore { - /// Opens a new peerstore tied to a JSON file at the given path. - /// - /// If the file exists, this function will open it. In any case, flushing the peerstore or - /// destroying it will write to the file. - #[inline] - pub fn new

(path: P) -> Result - where - P: Into, - { - Ok(JsonPeerstore { - store: JsonFileDatastore::new(path)?, - }) - } - - /// Flushes the content of the peer store to the disk. - /// - /// This function can only fail in case of a disk access error. If an error occurs, any change - /// to the peerstore that was performed since the last successful flush will be lost. No data - /// will be corrupted. - #[inline] - pub fn flush(&self) -> Result<(), IoError> { - self.store.flush() - } -} - -impl<'a> Peerstore for &'a JsonPeerstore { - type PeerAccess = JsonPeerstoreAccess<'a>; - type PeersIter = Box>; - - #[inline] - fn peer(self, peer_id: &PeerId) -> Option { - let hash = bs58::encode(peer_id.as_bytes()).into_string(); - self.store.lock(hash.into()).map(JsonPeerstoreAccess) - } - - #[inline] - fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess { - let hash = bs58::encode(peer_id.as_bytes()).into_string(); - JsonPeerstoreAccess(self.store.lock_or_create(hash.into())) - } - - fn peers(self) -> Self::PeersIter { - let query = self.store.query(Query { - prefix: "".into(), - filters: vec![], - orders: vec![], - skip: 0, - limit: u64::max_value(), - keys_only: false, - }); - - let list = query - .filter_map(|(key, info)| { - if info.addrs().count() == 0 { - return None // all addresses are expired - } - // We filter out invalid elements. This can happen if the JSON storage file was - // corrupted or manually modified by the user. - PeerId::from_bytes(bs58::decode(key).into_vec().ok()?).ok() - }) - .collect() - .wait(); // Wait can never block for the JSON datastore. - - // Need to handle I/O errors. Again we just ignore. - if let Ok(list) = list { - Box::new(list.into_iter()) as Box<_> - } else { - Box::new(iter::empty()) as Box<_> - } - } -} - -pub struct JsonPeerstoreAccess<'a>(JsonFileDatastoreEntry<'a, PeerInfo>); - -impl<'a> PeerAccess for JsonPeerstoreAccess<'a> { - type AddrsIter = VecIntoIter; - - #[inline] - fn addrs(&self) -> Self::AddrsIter { - self.0.addrs().cloned().collect::>().into_iter() - } - - #[inline] - fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) { - self.0 - .add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior); - } - - #[inline] - fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) { - self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl); - } - - #[inline] - fn clear_addrs(&mut self) { - self.0.set_addrs(iter::empty()); - } -} - -#[cfg(test)] -mod tests { - extern crate tempfile; - peerstore_tests!( - {::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap()} - {let temp_file = self::tempfile::NamedTempFile::new().unwrap()} - ); - - #[test] - fn reload() { - let temp_file = self::tempfile::NamedTempFile::new().unwrap(); - let peer_store = ::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap(); - - let peer_id = PeerId::random(); - let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); - - peer_store - .peer_or_create(&peer_id) - .add_addr(addr.clone(), Duration::from_millis(5000)); - peer_store.flush().unwrap(); - drop(peer_store); - - let peer_store = ::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap(); - let addrs = peer_store - .peer(&peer_id) - .unwrap() - .addrs() - .collect::>(); - assert_eq!(addrs, &[addr]); - } -} diff --git a/stores/peerstore/src/lib.rs b/stores/peerstore/src/lib.rs deleted file mode 100644 index f20947e4..00000000 --- a/stores/peerstore/src/lib.rs +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! The `peerstore` crate allows one to store information about a peer. -//! -//! `peerstore` is a key-value database, where the keys are multihashes (which usually corresponds -//! to the hash of the public key of the peer, but that is not enforced by this crate) and the -//! values are the public key and a list of multiaddresses. Additionally, the multiaddresses stored -//! by the `peerstore` have a time-to-live after which they disappear. -//! -//! This crate consists of a generic `Peerstore` trait and the follow implementations: -//! -//! - `JsonPeerstore`: Stores the information in a single JSON file. -//! - `MemoryPeerstore`: Stores the information in memory. -//! -//! Note that the peerstore implementations do not consider information inside a peer store to be -//! critical. In case of an error (e.g. corrupted file, disk error, etc.) they will prefer to lose -//! data rather than returning the error. -//! -//! # Example -//! -//! ``` -//! extern crate multiaddr; -//! extern crate libp2p_core; -//! extern crate libp2p_peerstore; -//! -//! # fn main() { -//! use libp2p_core::{PeerId, PublicKey}; -//! use libp2p_peerstore::memory_peerstore::MemoryPeerstore; -//! use libp2p_peerstore::{Peerstore, PeerAccess}; -//! use multiaddr::Multiaddr; -//! use std::time::Duration; -//! -//! // In this example we use a `MemoryPeerstore`, but you can easily swap it for another backend. -//! let mut peerstore = MemoryPeerstore::empty(); -//! let peer_id = PeerId::random(); -//! -//! // Let's write some information about a peer. -//! { -//! // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope. -//! let mut peer = peerstore.peer_or_create(&peer_id); -//! peer.add_addr("/ip4/10.11.12.13/tcp/20000".parse::().unwrap(), -//! Duration::from_millis(5000)); -//! } -//! -//! // Now let's load back the info. -//! { -//! let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore"); -//! assert_eq!(peer.addrs().collect::>(), -//! &["/ip4/10.11.12.13/tcp/20000".parse::().unwrap()]); -//! } -//! # } -//! ``` - -extern crate bs58; -extern crate datastore; -extern crate futures; -extern crate libp2p_core; -extern crate multiaddr; -extern crate owning_ref; -extern crate serde; -#[macro_use] -extern crate serde_derive; - -// TODO: remove -pub use self::libp2p_core::PeerId; -pub use self::peerstore::{PeerAccess, Peerstore}; - -#[macro_use] -mod peerstore_tests; - -pub mod json_peerstore; -pub mod memory_peerstore; -mod peer_info; -mod peerstore; - -pub type TTL = std::time::Duration; diff --git a/stores/peerstore/src/memory_peerstore.rs b/stores/peerstore/src/memory_peerstore.rs deleted file mode 100644 index 90e3e005..00000000 --- a/stores/peerstore/src/memory_peerstore.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Implementation of the `Peerstore` trait that simple stores peers in memory. - -use super::TTL; -use multiaddr::Multiaddr; -use owning_ref::OwningRefMut; -use peer_info::{AddAddrBehaviour, PeerInfo}; -use peerstore::{PeerAccess, Peerstore}; -use std::collections::HashMap; -use std::iter; -use std::sync::{Mutex, MutexGuard}; -use std::vec::IntoIter as VecIntoIter; -use PeerId; - -/// Implementation of the `Peerstore` trait that simply stores the peer information in memory. -#[derive(Debug)] -pub struct MemoryPeerstore { - store: Mutex>, -} - -impl MemoryPeerstore { - /// Initializes a new `MemoryPeerstore`. The database is initially empty. - #[inline] - pub fn empty() -> MemoryPeerstore { - MemoryPeerstore { - store: Mutex::new(HashMap::new()), - } - } -} - -impl Default for MemoryPeerstore { - #[inline] - fn default() -> MemoryPeerstore { - MemoryPeerstore::empty() - } -} - -impl<'a> Peerstore for &'a MemoryPeerstore { - type PeerAccess = MemoryPeerstoreAccess<'a>; - type PeersIter = VecIntoIter; - - fn peer(self, peer_id: &PeerId) -> Option { - let lock = self.store.lock().unwrap(); - OwningRefMut::new(lock) - .try_map_mut(|n| n.get_mut(peer_id).ok_or(())) - .ok() - .map(MemoryPeerstoreAccess) - } - - fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess { - let lock = self.store.lock().unwrap(); - let r = OwningRefMut::new(lock) - .map_mut(|n| n.entry(peer_id.clone()).or_insert_with(|| PeerInfo::new())); - MemoryPeerstoreAccess(r) - } - - fn peers(self) -> Self::PeersIter { - let lock = self.store.lock().unwrap(); - lock.iter() - .filter_map(|(id, info)| { - if info.addrs().count() == 0 { - return None // all addresses are expired - } - Some(id.clone()) - }) - .collect::>().into_iter() - } -} - -// Note: Rust doesn't provide a `MutexGuard::map` method, otherwise we could directly store a -// `MutexGuard<'a, (&'a PeerId, &'a PeerInfo)>`. -pub struct MemoryPeerstoreAccess<'a>( - OwningRefMut>, PeerInfo>, -); - -impl<'a> PeerAccess for MemoryPeerstoreAccess<'a> { - type AddrsIter = VecIntoIter; - - #[inline] - fn addrs(&self) -> Self::AddrsIter { - self.0.addrs().cloned().collect::>().into_iter() - } - - #[inline] - fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) { - self.0 - .add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior); - } - - #[inline] - fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) { - self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl); - } - - #[inline] - fn clear_addrs(&mut self) { - self.0.set_addrs(iter::empty()); - } -} - -#[cfg(test)] -mod tests { - peerstore_tests!({ ::memory_peerstore::MemoryPeerstore::empty() }); -} diff --git a/stores/peerstore/src/peer_info.rs b/stores/peerstore/src/peer_info.rs deleted file mode 100644 index 7641c71f..00000000 --- a/stores/peerstore/src/peer_info.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! The objective of the `peerstore` crate is to provide a key-value storage. Keys are peer IDs, -//! and the `PeerInfo` struct in this module is the value. -//! -//! Note that the `PeerInfo` struct implements `PartialOrd` in a dummy way so that it can be stored -//! in a `Datastore`. -//! If the `PeerInfo` struct ever gets exposed to the public API of the crate, we may want to give -//! more thoughts about this. - -use multiaddr::Multiaddr; -use std::cmp::Ordering; -use std::time::SystemTime; -use TTL; - -/// Information about a peer. -#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] -pub struct PeerInfo { - // Adresses, and the time at which they will be considered expired. - addrs: Vec<(Multiaddr, SystemTime)>, -} - -impl PeerInfo { - /// Builds a new empty `PeerInfo`. - #[inline] - pub fn new() -> PeerInfo { - PeerInfo { addrs: vec![] } - } - - /// Returns the list of the non-expired addresses stored in this `PeerInfo`. - /// - /// > **Note**: Keep in mind that this function is racy because addresses can expire between - /// > the moment when you get them and the moment when you process them. - #[inline] - pub fn addrs<'a>(&'a self) -> impl Iterator + 'a { - let now = SystemTime::now(); - self.addrs.iter().filter_map(move |(addr, expires)| { - if *expires >= now { - Some(addr) - } else { - None - } - }) - } - - /// Sets the list of addresses and their time-to-live. - /// - /// This removes all previously-stored addresses and replaces them with new ones. - #[inline] - pub fn set_addrs(&mut self, addrs: I) - where - I: IntoIterator, - { - let now = SystemTime::now(); - self.addrs = addrs - .into_iter() - .map(move |(addr, ttl)| (addr, now + ttl)) - .collect(); - } - - /// Adds a single address and its time-to-live. - /// - /// If the peer info already knows about that address, then what happens depends on the - /// `behaviour` parameter. - pub fn add_addr(&mut self, addr: Multiaddr, ttl: TTL, behaviour: AddAddrBehaviour) { - let expires = SystemTime::now() + ttl; - - if let Some(&mut (_, ref mut existing_expires)) = - self.addrs.iter_mut().find(|&&mut (ref a, _)| a == &addr) - { - if behaviour == AddAddrBehaviour::OverwriteTtl || *existing_expires < expires { - *existing_expires = expires; - } - return; - } - - self.addrs.push((addr, expires)); - } -} - -/// Behaviour of the `add_addr` function. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum AddAddrBehaviour { - /// Always overwrite the existing TTL. - OverwriteTtl, - /// Don't overwrite if the TTL is larger. - IgnoreTtlIfInferior, -} - -// The reason why we need to implement the PartialOrd trait is that the datastore library (a -// key-value storage) which we use allows performing queries where the results can be ordered. -// -// Since the struct that implements PartialOrd is internal and since we never use this ordering -// feature, I think it's ok to have this code. -impl PartialOrd for PeerInfo { - #[inline] - fn partial_cmp(&self, _other: &Self) -> Option { - None - } -} - - -#[cfg(test)] -mod tests { - extern crate serde_json; - use super::*; - use std::time::UNIX_EPOCH; - - #[test] - fn ser_and_deser() { - let peer_info = PeerInfo { - addrs: vec![( - "/ip4/0.0.0.0/tcp/0".parse::().unwrap(), - UNIX_EPOCH, - )], - }; - let serialized = serde_json::to_string(&peer_info).unwrap(); - let deserialized: PeerInfo = serde_json::from_str(&serialized).unwrap(); - assert_eq!(peer_info, deserialized); - } -} \ No newline at end of file diff --git a/stores/peerstore/src/peerstore.rs b/stores/peerstore/src/peerstore.rs deleted file mode 100644 index b3090d2d..00000000 --- a/stores/peerstore/src/peerstore.rs +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use multiaddr::Multiaddr; -use std::time::Duration; -use {PeerId, TTL}; - -/// Implemented on objects that store peers. -/// -/// Note that the methods of this trait take by ownership (i.e. `self` instead of `&self` or -/// `&mut self`). This was made so that the associated types could hold `self` internally and -/// because Rust doesn't have higher-ranked trait bounds yet. -/// -/// Therefore this trait should likely be implemented on `&'a ConcretePeerstore` instead of -/// on `ConcretePeerstore`. -pub trait Peerstore { - /// Grants access to the a peer inside the peer store. - type PeerAccess: PeerAccess; - /// List of the peers in this peer store. - type PeersIter: Iterator; - - /// Grants access to a peer by its ID. - fn peer(self, peer_id: &PeerId) -> Option; - - /// Grants access to a peer by its ID or creates it. - fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess; - - /// Returns a list of peers in this peer store. - /// - /// Keep in mind that the trait implementation may allow new peers to be added or removed at - /// any time. If that is the case, you have to take into account that this is only an - /// indication. - fn peers(self) -> Self::PeersIter; -} - -/// Implemented on objects that represent an open access to a peer stored in a peer store. -/// -/// The exact semantics of "open access" depend on the trait implementation. -pub trait PeerAccess { - /// Iterator returned by `addrs`. - type AddrsIter: Iterator; - - /// Returns all known and non-expired addresses for a given peer. - /// - /// > **Note**: Keep in mind that this function is racy because addresses can expire between - /// > the moment when you get them and the moment when you process them. - fn addrs(&self) -> Self::AddrsIter; - - /// Adds an address to a peer. - /// - /// If the manager already has this address stored and with a longer TTL, then the operation - /// is a no-op. - fn add_addr(&mut self, addr: Multiaddr, ttl: TTL); - - // Similar to calling `add_addr` multiple times in a row. - #[inline] - fn add_addrs(&mut self, addrs: I, ttl: TTL) - where - I: IntoIterator, - { - for addr in addrs.into_iter() { - self.add_addr(addr, ttl); - } - } - - /// Removes an address from a peer. - #[inline] - fn rm_addr(&mut self, addr: Multiaddr) { - self.set_addr_ttl(addr, Duration::new(0, 0)); - } - - // Similar to calling `rm_addr` multiple times in a row. - fn rm_addrs(&mut self, addrs: I) - where - I: IntoIterator, - { - for addr in addrs.into_iter() { - self.rm_addr(addr); - } - } - - /// Sets the TTL of an address of a peer. Adds the address if it is currently unknown. - /// - /// Contrary to `add_addr`, this operation is never a no-op. - fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL); - - // Similar to calling `set_addr_ttl` multiple times in a row. - fn set_addrs_ttl(&mut self, addrs: I, ttl: TTL) - where - I: IntoIterator, - { - for addr in addrs.into_iter() { - self.set_addr_ttl(addr, ttl); - } - } - - /// Removes all previously stored addresses. - fn clear_addrs(&mut self); -} diff --git a/stores/peerstore/src/peerstore_tests.rs b/stores/peerstore/src/peerstore_tests.rs deleted file mode 100644 index 156e336e..00000000 --- a/stores/peerstore/src/peerstore_tests.rs +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Contains the `peerstore_tests!` macro which will test an implementation of `Peerstore`. -//! You need to pass as first parameter a way to create the `Peerstore` that you want to test. -//! -//! You can also pass as additional parameters a list of statements that will be inserted before -//! each test. This allows you to have the peerstore builder use variables created by these -//! statements. - -#![cfg(test)] - -macro_rules! peerstore_tests { - ({$create_peerstore:expr} $({$stmt:stmt})*) => { - extern crate multihash; - use std::thread; - use std::time::Duration; - use {Peerstore, PeerAccess, PeerId}; - use multiaddr::Multiaddr; - - #[test] - fn initially_empty() { - $($stmt;)* - let peer_store = $create_peerstore; - let peer_id = PeerId::random(); - assert_eq!(peer_store.peers().count(), 0); - assert!(peer_store.peer(&peer_id).is_none()); - } - - #[test] - fn set_then_get_addr() { - $($stmt;)* - let peer_store = $create_peerstore; - let peer_id = PeerId::random(); - let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); - - peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); - - let addrs = peer_store.peer(&peer_id).unwrap().addrs().collect::>(); - assert_eq!(addrs, &[addr]); - } - - #[test] - fn add_expired_addr() { - // Add an already-expired address to a peer. - $($stmt;)* - let peer_store = $create_peerstore; - let peer_id = PeerId::random(); - let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); - - peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(0)); - thread::sleep(Duration::from_millis(2)); - - let addrs = peer_store.peer(&peer_id).unwrap().addrs(); - assert_eq!(addrs.count(), 0); - } - - #[test] - fn clear_addrs() { - $($stmt;)* - let peer_store = $create_peerstore; - let peer_id = PeerId::random(); - let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); - - peer_store.peer_or_create(&peer_id) - .add_addr(addr.clone(), Duration::from_millis(5000)); - peer_store.peer(&peer_id).unwrap().clear_addrs(); - - let addrs = peer_store.peer(&peer_id).unwrap().addrs(); - assert_eq!(addrs.count(), 0); - } - - #[test] - fn no_update_ttl() { - $($stmt;)* - let peer_store = $create_peerstore; - let peer_id = PeerId::random(); - - let addr1 = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); - let addr2 = "/ip4/0.0.0.1/tcp/0".parse::().unwrap(); - - peer_store.peer_or_create(&peer_id) - .add_addr(addr1.clone(), Duration::from_millis(5000)); - peer_store.peer_or_create(&peer_id) - .add_addr(addr2.clone(), Duration::from_millis(5000)); - assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2); - - // `add_addr` must not overwrite the TTL because it's already higher - peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(0)); - thread::sleep(Duration::from_millis(2)); - assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2); - } - - #[test] - fn force_update_ttl() { - $($stmt;)* - let peer_store = $create_peerstore; - let peer_id = PeerId::random(); - - let addr1 = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); - let addr2 = "/ip4/0.0.0.1/tcp/0".parse::().unwrap(); - - peer_store.peer_or_create(&peer_id) - .add_addr(addr1.clone(), Duration::from_millis(5000)); - peer_store.peer_or_create(&peer_id) - .add_addr(addr2.clone(), Duration::from_millis(5000)); - assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2); - - peer_store.peer_or_create(&peer_id) - .set_addr_ttl(addr1.clone(), Duration::from_millis(0)); - thread::sleep(Duration::from_millis(2)); - assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 1); - } - }; -} diff --git a/transports/relay/Cargo.toml b/transports/relay/Cargo.toml deleted file mode 100644 index 45f7045e..00000000 --- a/transports/relay/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "libp2p-relay" -version = "0.1.0" -authors = ["Parity Technologies "] -license = "MIT" - -[dependencies] -bytes = "0.4" -futures = "0.1" -libp2p-peerstore = { path = "../../stores/peerstore" } -libp2p-core = { path = "../../core" } -log = "0.4" -multiaddr = { path = "../../misc/multiaddr" } -protobuf = "2.0.2" -rand = "0.6" -tokio-codec = "0.1" -tokio-io = "0.1" -unsigned-varint = { version = "0.2.1", features = ["codec"] } -void = "1" diff --git a/transports/relay/src/copy.rs b/transports/relay/src/copy.rs deleted file mode 100644 index 389b861b..00000000 --- a/transports/relay/src/copy.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright (c) 2018 Tokio Contributors -// -// Permission is hereby granted, free of charge, to any -// person obtaining a copy of this software and associated -// documentation files (the "Software"), to deal in the -// Software without restriction, including without -// limitation the rights to use, copy, modify, merge, -// publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software -// is furnished to do so, subject to the following -// conditions: -// -// The above copyright notice and this permission notice -// shall be included in all copies or substantial portions -// of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -// Based on copy.rs from tokio-io in https://github.com/tokio-rs/tokio - -use futures::{Future, Poll, try_ready}; -use std::io; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// A future which will copy all data from a reader into a writer. -/// -/// Created by the [`copy`] function, this future will resolve to the number of -/// bytes copied or an error if one happens. -/// -/// [`copy`]: fn.copy.html -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct Copy { - reader: Option, - read_done: bool, - flush_done: bool, - writer: Option, - pos: usize, - cap: usize, - amt: u64, - buf: Box<[u8]>, -} - -/// Creates a future which represents copying all the bytes from one object to -/// another. -/// -/// The returned future will copy all the bytes read from `reader` into the -/// `writer` specified. This future will only complete once the `reader` has hit -/// EOF and all bytes have been written to and flushed from the `writer` -/// provided. -/// -/// On success the number of bytes is returned and the `reader` and `writer` are -/// consumed. On error the error is returned and the I/O objects are consumed as -/// well. -pub fn flushing_copy(reader: R, writer: W) -> Copy - where R: AsyncRead, - W: AsyncWrite, -{ - Copy { - reader: Some(reader), - read_done: false, - flush_done: true, - writer: Some(writer), - amt: 0, - pos: 0, - cap: 0, - buf: Box::new([0; 2048]), - } -} - -impl Future for Copy - where R: AsyncRead, - W: AsyncWrite, -{ - type Item = (u64, R, W); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(u64, R, W), io::Error> { - debug_assert!(self.reader.is_some() && self.writer.is_some(), - "poll() has been called again after returning Ok"); - - loop { - // Still not finished flushing - if !self.flush_done { - try_ready!(self.writer.as_mut().unwrap().poll_flush()); - self.flush_done = true - } - - // If our buffer is empty, then we need to read some data to - // continue. - if self.pos == self.cap && !self.read_done { - let reader = self.reader.as_mut().unwrap(); - let n = try_ready!(reader.poll_read(&mut self.buf)); - if n == 0 { - self.read_done = true; - } else { - self.pos = 0; - self.cap = n; - } - } - - // If our buffer has some data, let's write it out! - while self.pos < self.cap { - let writer = self.writer.as_mut().unwrap(); - let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap])); - if i == 0 { - return Err(io::Error::new(io::ErrorKind::WriteZero, - "write zero byte into writer")); - } else { - self.pos += i; - self.amt += i as u64; - } - } - - // The buffered data has been written, let's flush it! - if self.pos == self.cap && !self.read_done { - self.flush_done = false; - continue - } - - // Everything has been copied. - if self.pos == self.cap && self.read_done { - try_ready!(self.writer.as_mut().unwrap().poll_flush()); - let reader = self.reader.take().unwrap(); - let writer = self.writer.take().unwrap(); - return Ok((self.amt, reader, writer).into()) - } - } - } -} - diff --git a/transports/relay/src/error.rs b/transports/relay/src/error.rs deleted file mode 100644 index 0dca557f..00000000 --- a/transports/relay/src/error.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use libp2p_core::{PeerId, upgrade::UpgradeError}; -use std::{fmt, io}; - -#[derive(Debug)] -pub enum RelayError { - Io(io::Error), - Upgrade(UpgradeError), - NoRelayFor(PeerId), - Message(&'static str), - #[doc(hidden)] - __Nonexhaustive -} - - -impl fmt::Display for RelayError -where - E: fmt::Display -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - RelayError::Io(e) => write!(f, "i/o error: {}", e), - RelayError::Upgrade(e) => write!(f, "upgrade error: {}", e), - RelayError::NoRelayFor(p) => write!(f, "no relay for peer: {:?}", p), - RelayError::Message(m) => write!(f, "{}", m), - RelayError::__Nonexhaustive => f.write_str("__Nonexhaustive") - } - } -} - -impl std::error::Error for RelayError -where - E: std::error::Error -{ - fn cause(&self) -> Option<&dyn std::error::Error> { - match self { - RelayError::Io(e) => Some(e), - RelayError::Upgrade(e) => Some(e), - RelayError::NoRelayFor(_) => None, - RelayError::Message(_) => None, - RelayError::__Nonexhaustive => None - } - } -} - -impl From for RelayError { - fn from(e: io::Error) -> Self { - RelayError::Io(e) - } -} - -impl From> for RelayError { - fn from(e: UpgradeError) -> Self { - RelayError::Upgrade(e) - } -} - diff --git a/transports/relay/src/lib.rs b/transports/relay/src/lib.rs deleted file mode 100644 index cbdc40ef..00000000 --- a/transports/relay/src/lib.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate futures; -extern crate libp2p_peerstore as peerstore; -extern crate libp2p_core; -extern crate log; -extern crate multiaddr; -extern crate protobuf; -extern crate rand; -extern crate tokio_codec; -extern crate tokio_io; -extern crate unsigned_varint; -extern crate void; - -mod copy; -mod error; -mod message; -mod protocol; -mod transport; -mod utility; - -pub use protocol::{Output, RelayConfig}; -pub use transport::RelayTransport; diff --git a/transports/relay/src/message.proto b/transports/relay/src/message.proto deleted file mode 100644 index 7e382390..00000000 --- a/transports/relay/src/message.proto +++ /dev/null @@ -1,42 +0,0 @@ -syntax = "proto2"; - -message CircuitRelay { - - enum Status { - SUCCESS = 100; - HOP_SRC_ADDR_TOO_LONG = 220; - HOP_DST_ADDR_TOO_LONG = 221; - HOP_SRC_MULTIADDR_INVALID = 250; - HOP_DST_MULTIADDR_INVALID = 251; - HOP_NO_CONN_TO_DST = 260; - HOP_CANT_DIAL_DST = 261; - HOP_CANT_OPEN_DST_STREAM = 262; - HOP_CANT_SPEAK_RELAY = 270; - HOP_CANT_RELAY_TO_SELF = 280; - STOP_SRC_ADDR_TOO_LONG = 320; - STOP_DST_ADDR_TOO_LONG = 321; - STOP_SRC_MULTIADDR_INVALID = 350; - STOP_DST_MULTIADDR_INVALID = 351; - STOP_RELAY_REFUSED = 390; - MALFORMED_MESSAGE = 400; - } - - enum Type { // RPC identifier, either HOP, STOP or STATUS - HOP = 1; - STOP = 2; - STATUS = 3; - CAN_HOP = 4; // is peer a relay? - } - - message Peer { - required bytes id = 1; // peer id - repeated bytes addrs = 2; // peer's known addresses - } - - optional Type type = 1; // Type of the message - - optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP - optional Peer dstPeer = 3; - - optional Status code = 4; // Status code, used when Type is STATUS -} diff --git a/transports/relay/src/message.rs b/transports/relay/src/message.rs deleted file mode 100644 index 73d0736c..00000000 --- a/transports/relay/src/message.rs +++ /dev/null @@ -1,803 +0,0 @@ -// This file is generated by rust-protobuf 2.0.2. Do not edit -// @generated - -// https://github.com/Manishearth/rust-clippy/issues/702 -#![allow(unknown_lints)] -#![allow(clippy)] - -#![cfg_attr(rustfmt, rustfmt_skip)] - -#![allow(box_pointers)] -#![allow(dead_code)] -#![allow(missing_docs)] -#![allow(non_camel_case_types)] -#![allow(non_snake_case)] -#![allow(non_upper_case_globals)] -#![allow(trivial_casts)] -#![allow(unsafe_code)] -#![allow(unused_imports)] -#![allow(unused_results)] - -use protobuf::Message as Message_imported_for_functions; -use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; - -#[derive(PartialEq,Clone,Default)] -pub struct CircuitRelay { - // message fields - field_type: ::std::option::Option, - srcPeer: ::protobuf::SingularPtrField, - dstPeer: ::protobuf::SingularPtrField, - code: ::std::option::Option, - // special fields - unknown_fields: ::protobuf::UnknownFields, - cached_size: ::protobuf::CachedSize, -} - -impl CircuitRelay { - pub fn new() -> CircuitRelay { - ::std::default::Default::default() - } - - // optional .CircuitRelay.Type type = 1; - - pub fn clear_field_type(&mut self) { - self.field_type = ::std::option::Option::None; - } - - pub fn has_field_type(&self) -> bool { - self.field_type.is_some() - } - - // Param is passed by value, moved - pub fn set_field_type(&mut self, v: CircuitRelay_Type) { - self.field_type = ::std::option::Option::Some(v); - } - - pub fn get_field_type(&self) -> CircuitRelay_Type { - self.field_type.unwrap_or(CircuitRelay_Type::HOP) - } - - // optional .CircuitRelay.Peer srcPeer = 2; - - pub fn clear_srcPeer(&mut self) { - self.srcPeer.clear(); - } - - pub fn has_srcPeer(&self) -> bool { - self.srcPeer.is_some() - } - - // Param is passed by value, moved - pub fn set_srcPeer(&mut self, v: CircuitRelay_Peer) { - self.srcPeer = ::protobuf::SingularPtrField::some(v); - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_srcPeer(&mut self) -> &mut CircuitRelay_Peer { - if self.srcPeer.is_none() { - self.srcPeer.set_default(); - } - self.srcPeer.as_mut().unwrap() - } - - // Take field - pub fn take_srcPeer(&mut self) -> CircuitRelay_Peer { - self.srcPeer.take().unwrap_or_else(|| CircuitRelay_Peer::new()) - } - - pub fn get_srcPeer(&self) -> &CircuitRelay_Peer { - self.srcPeer.as_ref().unwrap_or_else(|| CircuitRelay_Peer::default_instance()) - } - - // optional .CircuitRelay.Peer dstPeer = 3; - - pub fn clear_dstPeer(&mut self) { - self.dstPeer.clear(); - } - - pub fn has_dstPeer(&self) -> bool { - self.dstPeer.is_some() - } - - // Param is passed by value, moved - pub fn set_dstPeer(&mut self, v: CircuitRelay_Peer) { - self.dstPeer = ::protobuf::SingularPtrField::some(v); - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_dstPeer(&mut self) -> &mut CircuitRelay_Peer { - if self.dstPeer.is_none() { - self.dstPeer.set_default(); - } - self.dstPeer.as_mut().unwrap() - } - - // Take field - pub fn take_dstPeer(&mut self) -> CircuitRelay_Peer { - self.dstPeer.take().unwrap_or_else(|| CircuitRelay_Peer::new()) - } - - pub fn get_dstPeer(&self) -> &CircuitRelay_Peer { - self.dstPeer.as_ref().unwrap_or_else(|| CircuitRelay_Peer::default_instance()) - } - - // optional .CircuitRelay.Status code = 4; - - pub fn clear_code(&mut self) { - self.code = ::std::option::Option::None; - } - - pub fn has_code(&self) -> bool { - self.code.is_some() - } - - // Param is passed by value, moved - pub fn set_code(&mut self, v: CircuitRelay_Status) { - self.code = ::std::option::Option::Some(v); - } - - pub fn get_code(&self) -> CircuitRelay_Status { - self.code.unwrap_or(CircuitRelay_Status::SUCCESS) - } -} - -impl ::protobuf::Message for CircuitRelay { - fn is_initialized(&self) -> bool { - for v in &self.srcPeer { - if !v.is_initialized() { - return false; - } - }; - for v in &self.dstPeer { - if !v.is_initialized() { - return false; - } - }; - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_proto2_enum_with_unknown_fields_into(wire_type, is, &mut self.field_type, 1, &mut self.unknown_fields)? - }, - 2 => { - ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.srcPeer)?; - }, - 3 => { - ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.dstPeer)?; - }, - 4 => { - ::protobuf::rt::read_proto2_enum_with_unknown_fields_into(wire_type, is, &mut self.code, 4, &mut self.unknown_fields)? - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if let Some(v) = self.field_type { - my_size += ::protobuf::rt::enum_size(1, v); - } - if let Some(ref v) = self.srcPeer.as_ref() { - let len = v.compute_size(); - my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; - } - if let Some(ref v) = self.dstPeer.as_ref() { - let len = v.compute_size(); - my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; - } - if let Some(v) = self.code { - my_size += ::protobuf::rt::enum_size(4, v); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { - if let Some(v) = self.field_type { - os.write_enum(1, v.value())?; - } - if let Some(ref v) = self.srcPeer.as_ref() { - os.write_tag(2, ::protobuf::wire_format::WireTypeLengthDelimited)?; - os.write_raw_varint32(v.get_cached_size())?; - v.write_to_with_cached_sizes(os)?; - } - if let Some(ref v) = self.dstPeer.as_ref() { - os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?; - os.write_raw_varint32(v.get_cached_size())?; - v.write_to_with_cached_sizes(os)?; - } - if let Some(v) = self.code { - os.write_enum(4, v.value())?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &::std::any::Any { - self as &::std::any::Any - } - fn as_any_mut(&mut self) -> &mut ::std::any::Any { - self as &mut ::std::any::Any - } - fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> CircuitRelay { - CircuitRelay::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, - }; - unsafe { - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( - "type", - |m: &CircuitRelay| { &m.field_type }, - |m: &mut CircuitRelay| { &mut m.field_type }, - )); - fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( - "srcPeer", - |m: &CircuitRelay| { &m.srcPeer }, - |m: &mut CircuitRelay| { &mut m.srcPeer }, - )); - fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( - "dstPeer", - |m: &CircuitRelay| { &m.dstPeer }, - |m: &mut CircuitRelay| { &mut m.dstPeer }, - )); - fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( - "code", - |m: &CircuitRelay| { &m.code }, - |m: &mut CircuitRelay| { &mut m.code }, - )); - ::protobuf::reflect::MessageDescriptor::new::( - "CircuitRelay", - fields, - file_descriptor_proto() - ) - }) - } - } - - fn default_instance() -> &'static CircuitRelay { - static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const CircuitRelay, - }; - unsafe { - instance.get(CircuitRelay::new) - } - } -} - -impl ::protobuf::Clear for CircuitRelay { - fn clear(&mut self) { - self.clear_field_type(); - self.clear_srcPeer(); - self.clear_dstPeer(); - self.clear_code(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for CircuitRelay { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for CircuitRelay { - fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { - ::protobuf::reflect::ProtobufValueRef::Message(self) - } -} - -#[derive(PartialEq,Clone,Default)] -pub struct CircuitRelay_Peer { - // message fields - id: ::protobuf::SingularField<::std::vec::Vec>, - addrs: ::protobuf::RepeatedField<::std::vec::Vec>, - // special fields - unknown_fields: ::protobuf::UnknownFields, - cached_size: ::protobuf::CachedSize, -} - -impl CircuitRelay_Peer { - pub fn new() -> CircuitRelay_Peer { - ::std::default::Default::default() - } - - // required bytes id = 1; - - pub fn clear_id(&mut self) { - self.id.clear(); - } - - pub fn has_id(&self) -> bool { - self.id.is_some() - } - - // Param is passed by value, moved - pub fn set_id(&mut self, v: ::std::vec::Vec) { - self.id = ::protobuf::SingularField::some(v); - } - - // Mutable pointer to the field. - // If field is not initialized, it is initialized with default value first. - pub fn mut_id(&mut self) -> &mut ::std::vec::Vec { - if self.id.is_none() { - self.id.set_default(); - } - self.id.as_mut().unwrap() - } - - // Take field - pub fn take_id(&mut self) -> ::std::vec::Vec { - self.id.take().unwrap_or_else(|| ::std::vec::Vec::new()) - } - - pub fn get_id(&self) -> &[u8] { - match self.id.as_ref() { - Some(v) => &v, - None => &[], - } - } - - // repeated bytes addrs = 2; - - pub fn clear_addrs(&mut self) { - self.addrs.clear(); - } - - // Param is passed by value, moved - pub fn set_addrs(&mut self, v: ::protobuf::RepeatedField<::std::vec::Vec>) { - self.addrs = v; - } - - // Mutable pointer to the field. - pub fn mut_addrs(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec> { - &mut self.addrs - } - - // Take field - pub fn take_addrs(&mut self) -> ::protobuf::RepeatedField<::std::vec::Vec> { - ::std::mem::replace(&mut self.addrs, ::protobuf::RepeatedField::new()) - } - - pub fn get_addrs(&self) -> &[::std::vec::Vec] { - &self.addrs - } -} - -impl ::protobuf::Message for CircuitRelay_Peer { - fn is_initialized(&self) -> bool { - if self.id.is_none() { - return false; - } - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.id)?; - }, - 2 => { - ::protobuf::rt::read_repeated_bytes_into(wire_type, is, &mut self.addrs)?; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if let Some(ref v) = self.id.as_ref() { - my_size += ::protobuf::rt::bytes_size(1, &v); - } - for value in &self.addrs { - my_size += ::protobuf::rt::bytes_size(2, &value); - }; - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { - if let Some(ref v) = self.id.as_ref() { - os.write_bytes(1, &v)?; - } - for v in &self.addrs { - os.write_bytes(2, &v)?; - }; - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &::std::any::Any { - self as &::std::any::Any - } - fn as_any_mut(&mut self) -> &mut ::std::any::Any { - self as &mut ::std::any::Any - } - fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> CircuitRelay_Peer { - CircuitRelay_Peer::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, - }; - unsafe { - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( - "id", - |m: &CircuitRelay_Peer| { &m.id }, - |m: &mut CircuitRelay_Peer| { &mut m.id }, - )); - fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( - "addrs", - |m: &CircuitRelay_Peer| { &m.addrs }, - |m: &mut CircuitRelay_Peer| { &mut m.addrs }, - )); - ::protobuf::reflect::MessageDescriptor::new::( - "CircuitRelay_Peer", - fields, - file_descriptor_proto() - ) - }) - } - } - - fn default_instance() -> &'static CircuitRelay_Peer { - static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const CircuitRelay_Peer, - }; - unsafe { - instance.get(CircuitRelay_Peer::new) - } - } -} - -impl ::protobuf::Clear for CircuitRelay_Peer { - fn clear(&mut self) { - self.clear_id(); - self.clear_addrs(); - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for CircuitRelay_Peer { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for CircuitRelay_Peer { - fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { - ::protobuf::reflect::ProtobufValueRef::Message(self) - } -} - -#[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum CircuitRelay_Status { - SUCCESS = 100, - HOP_SRC_ADDR_TOO_LONG = 220, - HOP_DST_ADDR_TOO_LONG = 221, - HOP_SRC_MULTIADDR_INVALID = 250, - HOP_DST_MULTIADDR_INVALID = 251, - HOP_NO_CONN_TO_DST = 260, - HOP_CANT_DIAL_DST = 261, - HOP_CANT_OPEN_DST_STREAM = 262, - HOP_CANT_SPEAK_RELAY = 270, - HOP_CANT_RELAY_TO_SELF = 280, - STOP_SRC_ADDR_TOO_LONG = 320, - STOP_DST_ADDR_TOO_LONG = 321, - STOP_SRC_MULTIADDR_INVALID = 350, - STOP_DST_MULTIADDR_INVALID = 351, - STOP_RELAY_REFUSED = 390, - MALFORMED_MESSAGE = 400, -} - -impl ::protobuf::ProtobufEnum for CircuitRelay_Status { - fn value(&self) -> i32 { - *self as i32 - } - - fn from_i32(value: i32) -> ::std::option::Option { - match value { - 100 => ::std::option::Option::Some(CircuitRelay_Status::SUCCESS), - 220 => ::std::option::Option::Some(CircuitRelay_Status::HOP_SRC_ADDR_TOO_LONG), - 221 => ::std::option::Option::Some(CircuitRelay_Status::HOP_DST_ADDR_TOO_LONG), - 250 => ::std::option::Option::Some(CircuitRelay_Status::HOP_SRC_MULTIADDR_INVALID), - 251 => ::std::option::Option::Some(CircuitRelay_Status::HOP_DST_MULTIADDR_INVALID), - 260 => ::std::option::Option::Some(CircuitRelay_Status::HOP_NO_CONN_TO_DST), - 261 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_DIAL_DST), - 262 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_OPEN_DST_STREAM), - 270 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_SPEAK_RELAY), - 280 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_RELAY_TO_SELF), - 320 => ::std::option::Option::Some(CircuitRelay_Status::STOP_SRC_ADDR_TOO_LONG), - 321 => ::std::option::Option::Some(CircuitRelay_Status::STOP_DST_ADDR_TOO_LONG), - 350 => ::std::option::Option::Some(CircuitRelay_Status::STOP_SRC_MULTIADDR_INVALID), - 351 => ::std::option::Option::Some(CircuitRelay_Status::STOP_DST_MULTIADDR_INVALID), - 390 => ::std::option::Option::Some(CircuitRelay_Status::STOP_RELAY_REFUSED), - 400 => ::std::option::Option::Some(CircuitRelay_Status::MALFORMED_MESSAGE), - _ => ::std::option::Option::None - } - } - - fn values() -> &'static [Self] { - static values: &'static [CircuitRelay_Status] = &[ - CircuitRelay_Status::SUCCESS, - CircuitRelay_Status::HOP_SRC_ADDR_TOO_LONG, - CircuitRelay_Status::HOP_DST_ADDR_TOO_LONG, - CircuitRelay_Status::HOP_SRC_MULTIADDR_INVALID, - CircuitRelay_Status::HOP_DST_MULTIADDR_INVALID, - CircuitRelay_Status::HOP_NO_CONN_TO_DST, - CircuitRelay_Status::HOP_CANT_DIAL_DST, - CircuitRelay_Status::HOP_CANT_OPEN_DST_STREAM, - CircuitRelay_Status::HOP_CANT_SPEAK_RELAY, - CircuitRelay_Status::HOP_CANT_RELAY_TO_SELF, - CircuitRelay_Status::STOP_SRC_ADDR_TOO_LONG, - CircuitRelay_Status::STOP_DST_ADDR_TOO_LONG, - CircuitRelay_Status::STOP_SRC_MULTIADDR_INVALID, - CircuitRelay_Status::STOP_DST_MULTIADDR_INVALID, - CircuitRelay_Status::STOP_RELAY_REFUSED, - CircuitRelay_Status::MALFORMED_MESSAGE, - ]; - values - } - - fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { - static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const ::protobuf::reflect::EnumDescriptor, - }; - unsafe { - descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new("CircuitRelay_Status", file_descriptor_proto()) - }) - } - } -} - -impl ::std::marker::Copy for CircuitRelay_Status { -} - -impl ::protobuf::reflect::ProtobufValue for CircuitRelay_Status { - fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { - ::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor()) - } -} - -#[derive(Clone,PartialEq,Eq,Debug,Hash)] -pub enum CircuitRelay_Type { - HOP = 1, - STOP = 2, - STATUS = 3, - CAN_HOP = 4, -} - -impl ::protobuf::ProtobufEnum for CircuitRelay_Type { - fn value(&self) -> i32 { - *self as i32 - } - - fn from_i32(value: i32) -> ::std::option::Option { - match value { - 1 => ::std::option::Option::Some(CircuitRelay_Type::HOP), - 2 => ::std::option::Option::Some(CircuitRelay_Type::STOP), - 3 => ::std::option::Option::Some(CircuitRelay_Type::STATUS), - 4 => ::std::option::Option::Some(CircuitRelay_Type::CAN_HOP), - _ => ::std::option::Option::None - } - } - - fn values() -> &'static [Self] { - static values: &'static [CircuitRelay_Type] = &[ - CircuitRelay_Type::HOP, - CircuitRelay_Type::STOP, - CircuitRelay_Type::STATUS, - CircuitRelay_Type::CAN_HOP, - ]; - values - } - - fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor { - static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const ::protobuf::reflect::EnumDescriptor, - }; - unsafe { - descriptor.get(|| { - ::protobuf::reflect::EnumDescriptor::new("CircuitRelay_Type", file_descriptor_proto()) - }) - } - } -} - -impl ::std::marker::Copy for CircuitRelay_Type { -} - -impl ::protobuf::reflect::ProtobufValue for CircuitRelay_Type { - fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { - ::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor()) - } -} - -static file_descriptor_proto_data: &'static [u8] = b"\ - \n\rmessage.proto\"\xe3\x05\n\x0cCircuitRelay\x12&\n\x04type\x18\x01\x20\ - \x01(\x0e2\x12.CircuitRelay.TypeR\x04type\x12,\n\x07srcPeer\x18\x02\x20\ - \x01(\x0b2\x12.CircuitRelay.PeerR\x07srcPeer\x12,\n\x07dstPeer\x18\x03\ - \x20\x01(\x0b2\x12.CircuitRelay.PeerR\x07dstPeer\x12(\n\x04code\x18\x04\ - \x20\x01(\x0e2\x14.CircuitRelay.StatusR\x04code\x1a,\n\x04Peer\x12\x0e\n\ - \x02id\x18\x01\x20\x02(\x0cR\x02id\x12\x14\n\x05addrs\x18\x02\x20\x03(\ - \x0cR\x05addrs\"\xc2\x03\n\x06Status\x12\x0b\n\x07SUCCESS\x10d\x12\x1a\n\ - \x15HOP_SRC_ADDR_TOO_LONG\x10\xdc\x01\x12\x1a\n\x15HOP_DST_ADDR_TOO_LONG\ - \x10\xdd\x01\x12\x1e\n\x19HOP_SRC_MULTIADDR_INVALID\x10\xfa\x01\x12\x1e\ - \n\x19HOP_DST_MULTIADDR_INVALID\x10\xfb\x01\x12\x17\n\x12HOP_NO_CONN_TO_\ - DST\x10\x84\x02\x12\x16\n\x11HOP_CANT_DIAL_DST\x10\x85\x02\x12\x1d\n\x18\ - HOP_CANT_OPEN_DST_STREAM\x10\x86\x02\x12\x19\n\x14HOP_CANT_SPEAK_RELAY\ - \x10\x8e\x02\x12\x1b\n\x16HOP_CANT_RELAY_TO_SELF\x10\x98\x02\x12\x1b\n\ - \x16STOP_SRC_ADDR_TOO_LONG\x10\xc0\x02\x12\x1b\n\x16STOP_DST_ADDR_TOO_LO\ - NG\x10\xc1\x02\x12\x1f\n\x1aSTOP_SRC_MULTIADDR_INVALID\x10\xde\x02\x12\ - \x1f\n\x1aSTOP_DST_MULTIADDR_INVALID\x10\xdf\x02\x12\x17\n\x12STOP_RELAY\ - _REFUSED\x10\x86\x03\x12\x16\n\x11MALFORMED_MESSAGE\x10\x90\x03\"2\n\x04\ - Type\x12\x07\n\x03HOP\x10\x01\x12\x08\n\x04STOP\x10\x02\x12\n\n\x06STATU\ - S\x10\x03\x12\x0b\n\x07CAN_HOP\x10\x04J\xbc\r\n\x06\x12\x04\0\0)\x01\n\ - \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0)\x01\n\n\n\ - \x03\x04\0\x01\x12\x03\x02\x08\x14\n\x0c\n\x04\x04\0\x04\0\x12\x04\x04\ - \x04\x15\x05\n\x0c\n\x05\x04\0\x04\0\x01\x12\x03\x04\t\x0f\n\r\n\x06\x04\ - \0\x04\0\x02\0\x12\x03\x05\x08)\n\x0e\n\x07\x04\0\x04\0\x02\0\x01\x12\ - \x03\x05\x08\x0f\n\x0e\n\x07\x04\0\x04\0\x02\0\x02\x12\x03\x05%(\n\r\n\ - \x06\x04\0\x04\0\x02\x01\x12\x03\x06\x08)\n\x0e\n\x07\x04\0\x04\0\x02\ - \x01\x01\x12\x03\x06\x08\x1d\n\x0e\n\x07\x04\0\x04\0\x02\x01\x02\x12\x03\ - \x06%(\n\r\n\x06\x04\0\x04\0\x02\x02\x12\x03\x07\x08)\n\x0e\n\x07\x04\0\ - \x04\0\x02\x02\x01\x12\x03\x07\x08\x1d\n\x0e\n\x07\x04\0\x04\0\x02\x02\ - \x02\x12\x03\x07%(\n\r\n\x06\x04\0\x04\0\x02\x03\x12\x03\x08\x08)\n\x0e\ - \n\x07\x04\0\x04\0\x02\x03\x01\x12\x03\x08\x08!\n\x0e\n\x07\x04\0\x04\0\ - \x02\x03\x02\x12\x03\x08%(\n\r\n\x06\x04\0\x04\0\x02\x04\x12\x03\t\x08)\ - \n\x0e\n\x07\x04\0\x04\0\x02\x04\x01\x12\x03\t\x08!\n\x0e\n\x07\x04\0\ - \x04\0\x02\x04\x02\x12\x03\t%(\n\r\n\x06\x04\0\x04\0\x02\x05\x12\x03\n\ - \x08)\n\x0e\n\x07\x04\0\x04\0\x02\x05\x01\x12\x03\n\x08\x1a\n\x0e\n\x07\ - \x04\0\x04\0\x02\x05\x02\x12\x03\n%(\n\r\n\x06\x04\0\x04\0\x02\x06\x12\ - \x03\x0b\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x06\x01\x12\x03\x0b\x08\x19\n\ - \x0e\n\x07\x04\0\x04\0\x02\x06\x02\x12\x03\x0b%(\n\r\n\x06\x04\0\x04\0\ - \x02\x07\x12\x03\x0c\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x07\x01\x12\x03\ - \x0c\x08\x20\n\x0e\n\x07\x04\0\x04\0\x02\x07\x02\x12\x03\x0c%(\n\r\n\x06\ - \x04\0\x04\0\x02\x08\x12\x03\r\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x08\x01\ - \x12\x03\r\x08\x1c\n\x0e\n\x07\x04\0\x04\0\x02\x08\x02\x12\x03\r%(\n\r\n\ - \x06\x04\0\x04\0\x02\t\x12\x03\x0e\x08)\n\x0e\n\x07\x04\0\x04\0\x02\t\ - \x01\x12\x03\x0e\x08\x1e\n\x0e\n\x07\x04\0\x04\0\x02\t\x02\x12\x03\x0e%(\ - \n\r\n\x06\x04\0\x04\0\x02\n\x12\x03\x0f\x08)\n\x0e\n\x07\x04\0\x04\0\ - \x02\n\x01\x12\x03\x0f\x08\x1e\n\x0e\n\x07\x04\0\x04\0\x02\n\x02\x12\x03\ - \x0f%(\n\r\n\x06\x04\0\x04\0\x02\x0b\x12\x03\x10\x08)\n\x0e\n\x07\x04\0\ - \x04\0\x02\x0b\x01\x12\x03\x10\x08\x1e\n\x0e\n\x07\x04\0\x04\0\x02\x0b\ - \x02\x12\x03\x10%(\n\r\n\x06\x04\0\x04\0\x02\x0c\x12\x03\x11\x08)\n\x0e\ - \n\x07\x04\0\x04\0\x02\x0c\x01\x12\x03\x11\x08\"\n\x0e\n\x07\x04\0\x04\0\ - \x02\x0c\x02\x12\x03\x11%(\n\r\n\x06\x04\0\x04\0\x02\r\x12\x03\x12\x08)\ - \n\x0e\n\x07\x04\0\x04\0\x02\r\x01\x12\x03\x12\x08\"\n\x0e\n\x07\x04\0\ - \x04\0\x02\r\x02\x12\x03\x12%(\n\r\n\x06\x04\0\x04\0\x02\x0e\x12\x03\x13\ - \x08)\n\x0e\n\x07\x04\0\x04\0\x02\x0e\x01\x12\x03\x13\x08\x1a\n\x0e\n\ - \x07\x04\0\x04\0\x02\x0e\x02\x12\x03\x13%(\n\r\n\x06\x04\0\x04\0\x02\x0f\ - \x12\x03\x14\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x0f\x01\x12\x03\x14\x08\ - \x19\n\x0e\n\x07\x04\0\x04\0\x02\x0f\x02\x12\x03\x14%(\n:\n\x04\x04\0\ - \x04\x01\x12\x04\x17\x04\x1c\x05\",\x20RPC\x20identifier,\x20either\x20H\ - OP,\x20STOP\x20or\x20STATUS\n\n\x0c\n\x05\x04\0\x04\x01\x01\x12\x03\x17\ - \t\r\n\r\n\x06\x04\0\x04\x01\x02\0\x12\x03\x18\x08\x10\n\x0e\n\x07\x04\0\ - \x04\x01\x02\0\x01\x12\x03\x18\x08\x0b\n\x0e\n\x07\x04\0\x04\x01\x02\0\ - \x02\x12\x03\x18\x0e\x0f\n\r\n\x06\x04\0\x04\x01\x02\x01\x12\x03\x19\x08\ - \x11\n\x0e\n\x07\x04\0\x04\x01\x02\x01\x01\x12\x03\x19\x08\x0c\n\x0e\n\ - \x07\x04\0\x04\x01\x02\x01\x02\x12\x03\x19\x0f\x10\n\r\n\x06\x04\0\x04\ - \x01\x02\x02\x12\x03\x1a\x08\x13\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x01\ - \x12\x03\x1a\x08\x0e\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x02\x12\x03\x1a\ - \x11\x12\n!\n\x06\x04\0\x04\x01\x02\x03\x12\x03\x1b\x08\x14\"\x12\x20is\ - \x20peer\x20a\x20relay?\n\n\x0e\n\x07\x04\0\x04\x01\x02\x03\x01\x12\x03\ - \x1b\x08\x0f\n\x0e\n\x07\x04\0\x04\x01\x02\x03\x02\x12\x03\x1b\x12\x13\n\ - \x0c\n\x04\x04\0\x03\0\x12\x04\x1e\x04!\x05\n\x0c\n\x05\x04\0\x03\0\x01\ - \x12\x03\x1e\x0c\x10\n\x18\n\x06\x04\0\x03\0\x02\0\x12\x03\x1f\x08\x1e\"\ - \t\x20peer\x20id\n\n\x0e\n\x07\x04\0\x03\0\x02\0\x04\x12\x03\x1f\x08\x10\ - \n\x0e\n\x07\x04\0\x03\0\x02\0\x05\x12\x03\x1f\x11\x16\n\x0e\n\x07\x04\0\ - \x03\0\x02\0\x01\x12\x03\x1f\x17\x19\n\x0e\n\x07\x04\0\x03\0\x02\0\x03\ - \x12\x03\x1f\x1c\x1d\n'\n\x06\x04\0\x03\0\x02\x01\x12\x03\x20\x08!\"\x18\ - \x20peer's\x20known\x20addresses\n\n\x0e\n\x07\x04\0\x03\0\x02\x01\x04\ - \x12\x03\x20\x08\x10\n\x0e\n\x07\x04\0\x03\0\x02\x01\x05\x12\x03\x20\x11\ - \x16\n\x0e\n\x07\x04\0\x03\0\x02\x01\x01\x12\x03\x20\x17\x1c\n\x0e\n\x07\ - \x04\0\x03\0\x02\x01\x03\x12\x03\x20\x1f\x20\n\"\n\x04\x04\0\x02\0\x12\ - \x03#\x04\x1b\"\x15\x20Type\x20of\x20the\x20message\n\n\x0c\n\x05\x04\0\ - \x02\0\x04\x12\x03#\x04\x0c\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03#\r\x11\n\ - \x0c\n\x05\x04\0\x02\0\x01\x12\x03#\x12\x16\n\x0c\n\x05\x04\0\x02\0\x03\ - \x12\x03#\x19\x1a\nD\n\x04\x04\0\x02\x01\x12\x03%\x04\x1e\"7\x20srcPeer\ - \x20and\x20dstPeer\x20are\x20used\x20when\x20Type\x20is\x20HOP\x20or\x20\ - STOP\n\n\x0c\n\x05\x04\0\x02\x01\x04\x12\x03%\x04\x0c\n\x0c\n\x05\x04\0\ - \x02\x01\x06\x12\x03%\r\x11\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03%\x12\ - \x19\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03%\x1c\x1d\n\x0b\n\x04\x04\0\ - \x02\x02\x12\x03&\x04\x1e\n\x0c\n\x05\x04\0\x02\x02\x04\x12\x03&\x04\x0c\ - \n\x0c\n\x05\x04\0\x02\x02\x06\x12\x03&\r\x11\n\x0c\n\x05\x04\0\x02\x02\ - \x01\x12\x03&\x12\x19\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03&\x1c\x1d\n4\ - \n\x04\x04\0\x02\x03\x12\x03(\x04\x1d\"'\x20Status\x20code,\x20used\x20w\ - hen\x20Type\x20is\x20STATUS\n\n\x0c\n\x05\x04\0\x02\x03\x04\x12\x03(\x04\ - \x0c\n\x0c\n\x05\x04\0\x02\x03\x06\x12\x03(\r\x13\n\x0c\n\x05\x04\0\x02\ - \x03\x01\x12\x03(\x14\x18\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03(\x1b\x1c\ -"; - -static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { - lock: ::protobuf::lazy::ONCE_INIT, - ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto, -}; - -fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { - ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap() -} - -pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { - unsafe { - file_descriptor_proto_lazy.get(|| { - parse_descriptor_proto() - }) - } -} diff --git a/transports/relay/src/protocol.rs b/transports/relay/src/protocol.rs deleted file mode 100644 index 976262b5..00000000 --- a/transports/relay/src/protocol.rs +++ /dev/null @@ -1,322 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use bytes::Bytes; -use crate::{ - copy, - error::RelayError, - message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type}, - utility::{io_err, is_success, status, Io, Peer} -}; -use futures::{stream, future::{self, Either::{A, B}, FutureResult}, prelude::*}; -use libp2p_core::{ - transport::Transport, - upgrade::{apply_outbound, InboundUpgrade, OutboundUpgrade, UpgradeInfo} -}; -use log::debug; -use peerstore::{PeerAccess, PeerId, Peerstore}; -use std::{io, iter, ops::Deref}; -use tokio_io::{AsyncRead, AsyncWrite}; -use void::Void; - -#[derive(Debug, Clone)] -pub struct RelayConfig { - my_id: PeerId, - dialer: T, - peers: P, - // If `allow_relays` is false this node can only be used as a - // destination but will not allow relaying streams to other - // destinations. - allow_relays: bool -} - -// The `RelayConfig` upgrade can serve as destination or relay. Each mode needs a different -// output type. As destination we want the stream to continue to be usable, whereas as relay -// we pipe data from source to destination and do not want to use the stream in any other way. -// Therefore, in the latter case we simply return a future that can be driven to completion -// but otherwise the stream is not programmatically accessible. -pub enum Output { - Stream(C), - Sealed(Box + Send>) -} - -impl UpgradeInfo for RelayConfig { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; - - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/libp2p/relay/circuit/0.1.0"), ())) - } -} - -impl InboundUpgrade for RelayConfig -where - C: AsyncRead + AsyncWrite + Send + 'static, - T: Transport + Clone + Send + 'static, - T::Dial: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, - T::Output: AsyncRead + AsyncWrite + Send, - P: Deref + Clone + Send + 'static, - S: 'static, - for<'a> &'a S: Peerstore -{ - type Output = Output; - type Error = RelayError; - type Future = Box + Send>; - - fn upgrade_inbound(self, conn: C, _: ()) -> Self::Future { - let future = Io::new(conn).recv().from_err().and_then(move |(message, io)| { - let msg = if let Some(m) = message { - m - } else { - return A(A(future::err(RelayError::Message("no message received")))) - }; - match msg.get_field_type() { - CircuitRelay_Type::HOP if self.allow_relays => { // act as relay - B(A(self.on_hop(msg, io).map(|fut| Output::Sealed(Box::new(fut))))) - } - CircuitRelay_Type::STOP => { // act as destination - B(B(self.on_stop(msg, io).from_err().map(Output::Stream))) - } - other => { - debug!("invalid message type: {:?}", other); - let resp = status(CircuitRelay_Status::MALFORMED_MESSAGE); - A(B(io.send(resp).from_err().and_then(|_| Err(RelayError::Message("invalid message type"))))) - } - } - }); - Box::new(future) - } -} - -impl RelayConfig -where - T: Transport + Clone + 'static, - T::Dial: Send, // TODO: remove - T::Listener: Send, // TODO: remove - T::ListenerUpgrade: Send, // TODO: remove - T::Output: Send + AsyncRead + AsyncWrite, - P: Deref + Clone + 'static, - for<'a> &'a S: Peerstore, -{ - pub fn new(my_id: PeerId, dialer: T, peers: P) -> RelayConfig { - RelayConfig { my_id, dialer, peers, allow_relays: true } - } - - pub fn allow_relays(&mut self, val: bool) { - self.allow_relays = val - } - - // HOP message handling (relay mode). - fn on_hop(self, mut msg: CircuitRelay, io: Io) -> impl Future, Error=RelayError> - where - C: AsyncRead + AsyncWrite + 'static, - { - let from = if let Some(peer) = Peer::from_message(msg.take_srcPeer()) { - peer - } else { - let msg = status(CircuitRelay_Status::HOP_SRC_MULTIADDR_INVALID); - return A(io.send(msg).from_err().and_then(|_| Err(RelayError::Message("invalid src address")))) - }; - - let mut dest = if let Some(peer) = Peer::from_message(msg.take_dstPeer()) { - peer - } else { - let msg = status(CircuitRelay_Status::HOP_DST_MULTIADDR_INVALID); - return B(A(io.send(msg).from_err().and_then(|_| Err(RelayError::Message("invalid dest address"))))) - }; - - if dest.addrs.is_empty() { - // Add locally know addresses of destination - if let Some(peer) = self.peers.peer(&dest.id) { - dest.addrs.extend(peer.addrs()) - } - } - - let stop = stop_message(&from, &dest); - - let dialer = self.dialer; - let future = stream::iter_ok(dest.addrs.into_iter()) - .and_then(move |dest_addr| { - dialer.clone().dial(dest_addr).map_err(|_| RelayError::Message("could no dial addr")) - }) - .and_then(|outbound| outbound.from_err().and_then(|c| apply_outbound(c, TrivialUpgrade).from_err())) - .then(|result| Ok(result.ok())) - .filter_map(|result| result) - .into_future() - .map_err(|(err, _stream)| err) - .and_then(move |(ok, _stream)| { - if let Some(c) = ok { - // send STOP message to destination and expect back a SUCCESS message - let future = Io::new(c) - .send(stop) - .and_then(Io::recv) - .from_err() - .and_then(|(response, io)| { - let rsp = match response { - Some(m) => m, - None => return Err(RelayError::Message("no message from destination")) - }; - if is_success(&rsp) { - Ok(io.into()) - } else { - Err(RelayError::Message("no success response from relay")) - } - }); - A(future) - } else { - B(future::err(RelayError::Message("could not dial peer"))) - } - }) - // signal success or failure to source - .then(move |result| { - match result { - Ok(c) => { - let msg = status(CircuitRelay_Status::SUCCESS); - A(io.send(msg).map(|io| (io.into(), c)).from_err()) - } - Err(e) => { - let msg = status(CircuitRelay_Status::HOP_CANT_DIAL_DST); - B(io.send(msg).from_err().and_then(|_| Err(e))) - } - } - }) - // return future for bidirectional data transfer - .and_then(move |(src, dst)| { - let future = { - let (src_r, src_w) = src.split(); - let (dst_r, dst_w) = dst.split(); - let a = copy::flushing_copy(src_r, dst_w).map(|_| ()); - let b = copy::flushing_copy(dst_r, src_w).map(|_| ()); - a.select(b).map(|_| ()).map_err(|(e, _)| e) - }; - Ok(future) - }); - - B(B(future)) - } - - // STOP message handling (destination mode) - fn on_stop(self, mut msg: CircuitRelay, io: Io) -> impl Future - where - C: AsyncRead + AsyncWrite + 'static, - { - let dest = if let Some(peer) = Peer::from_message(msg.take_dstPeer()) { - peer - } else { - let msg = status(CircuitRelay_Status::STOP_DST_MULTIADDR_INVALID); - return A(io.send(msg).and_then(|_| Err(io_err("invalid dest address")))) - }; - - if dest.id != self.my_id { - let msg = status(CircuitRelay_Status::STOP_RELAY_REFUSED); - return B(A(io.send(msg).and_then(|_| Err(io_err("destination id mismatch"))))) - } - - B(B(io.send(status(CircuitRelay_Status::SUCCESS)).map(Io::into))) - } -} - -fn stop_message(from: &Peer, dest: &Peer) -> CircuitRelay { - let mut msg = CircuitRelay::new(); - msg.set_field_type(CircuitRelay_Type::STOP); - - let mut f = CircuitRelay_Peer::new(); - f.set_id(from.id.as_bytes().to_vec()); - for a in &from.addrs { - f.mut_addrs().push(a.to_bytes()) - } - msg.set_srcPeer(f); - - let mut d = CircuitRelay_Peer::new(); - d.set_id(dest.id.as_bytes().to_vec()); - for a in &dest.addrs { - d.mut_addrs().push(a.to_bytes()) - } - msg.set_dstPeer(d); - - msg -} - -#[derive(Debug, Clone)] -struct TrivialUpgrade; - -impl UpgradeInfo for TrivialUpgrade { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; - - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/libp2p/relay/circuit/0.1.0"), ())) - } -} - -impl OutboundUpgrade for TrivialUpgrade -where - C: AsyncRead + AsyncWrite + 'static -{ - type Output = C; - type Error = Void; - type Future = FutureResult; - - fn upgrade_outbound(self, conn: C, _: ()) -> Self::Future { - future::ok(conn) - } -} - -#[derive(Debug, Clone)] -pub(crate) struct Source(pub(crate) CircuitRelay); - -impl UpgradeInfo for Source { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; - - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/libp2p/relay/circuit/0.1.0"), ())) - } -} - -impl OutboundUpgrade for Source -where - C: AsyncRead + AsyncWrite + Send + 'static, -{ - type Output = C; - type Error = io::Error; - type Future = Box + Send>; - - fn upgrade_outbound(self, conn: C, _: ()) -> Self::Future { - let future = Io::new(conn) - .send(self.0) - .and_then(Io::recv) - .and_then(|(response, io)| { - let rsp = match response { - Some(m) => m, - None => return Err(io_err("no message from relay")), - }; - if is_success(&rsp) { - Ok(io.into()) - } else { - Err(io_err("no success response from relay")) - } - }); - Box::new(future) - } -} - diff --git a/transports/relay/src/transport.rs b/transports/relay/src/transport.rs deleted file mode 100644 index ef52cb42..00000000 --- a/transports/relay/src/transport.rs +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::{ - error::RelayError, - message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Type}, - protocol, - utility::{Peer, RelayAddr} -}; -use futures::{future, stream, prelude::*}; -use libp2p_core::{transport::Transport, upgrade::apply_outbound}; -use log::{debug, info, trace}; -use multiaddr::Multiaddr; -use peerstore::{PeerAccess, PeerId, Peerstore}; -use rand::prelude::*; -use std::{io, iter::FromIterator, ops::Deref, sync::Arc}; -use tokio_io::{AsyncRead, AsyncWrite}; - -#[derive(Debug, Clone)] -pub struct RelayTransport { - my_id: PeerId, - transport: T, - peers: P, - relays: Arc> -} - -impl Transport for RelayTransport -where - T: Transport + Send + Clone + 'static, - T::Dial: Send, - T::Output: AsyncRead + AsyncWrite + Send, - P: Deref + Clone + 'static, - S: 'static, - for<'a> &'a S: Peerstore -{ - type Output = T::Output; - type Listener = stream::Empty<(Self::ListenerUpgrade, Multiaddr), io::Error>; - type ListenerUpgrade = future::Empty; - type Dial = Box + Send>; - - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - Err((self, addr)) - } - - fn dial(self, addr: Multiaddr) -> Result { - match RelayAddr::parse(&addr) { - RelayAddr::Malformed => { - debug!("malformed address: {}", addr); - return Err((self, addr)); - } - RelayAddr::Multihop => { - debug!("multihop address: {}", addr); - return Err((self, addr)); - } - RelayAddr::Address { relay, dest } => { - if let Some(ref r) = relay { - let f = self.relay_via(r, &dest).map_err(|this| (this, addr))?; - Ok(Box::new(f.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))) - } else { - let f = self.relay_to(&dest).map_err(|this| (this, addr))?; - Ok(Box::new(f.map_err(|e| io::Error::new(io::ErrorKind::Other, e)))) - } - } - } - } - - fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { - self.transport.nat_traversal(a, b) - } -} - -impl RelayTransport -where - T: Transport + Clone + 'static, - T::Dial: Send, - T::Output: AsyncRead + AsyncWrite + Send, - P: Deref + Clone + 'static, - for<'a> &'a S: Peerstore -{ - /// Create a new relay transport. - /// - /// This transport uses a static set of relays and will not attempt - /// any relay discovery. - pub fn new(my_id: PeerId, transport: T, peers: P, relays: R) -> Self - where - R: IntoIterator, - { - RelayTransport { - my_id, - transport, - peers, - relays: Arc::new(Vec::from_iter(relays)), - } - } - - // Relay to destination over any available relay node. - fn relay_to(self, destination: &Peer) -> Result>, Self> { - trace!("relay_to {:?}", destination.id); - let mut dials = Vec::new(); - for relay in &*self.relays { - let relay_peer = Peer { - id: relay.clone(), - addrs: Vec::new(), - }; - if let Ok(dial) = self.clone().relay_via(&relay_peer, destination) { - dials.push(dial) - } - } - - if dials.is_empty() { - info!("no relay available for {:?}", destination.id); - return Err(self); - } - - // Try one relay after another and stick to the first working one. - dials.shuffle(&mut thread_rng()); // randomise to spread load - let dest_peer = destination.id.clone(); - let future = stream::iter_ok(dials.into_iter()) - .and_then(|dial| dial) - .then(|result| Ok(result.ok())) - .filter_map(|result| result) - .into_future() - .map_err(|(err, _stream)| err) - .and_then(move |(ok, _stream)| { - if let Some(out) = ok { - Ok(out) - } else { - Err(RelayError::NoRelayFor(dest_peer)) - } - }); - Ok(future) - } - - // Relay to destination via the given peer. - fn relay_via(self, relay: &Peer, destination: &Peer) -> Result>, Self> { - trace!("relay_via {:?} to {:?}", relay.id, destination.id); - let mut addresses = Vec::new(); - - if relay.addrs.is_empty() { - // try all known relay addresses - if let Some(peer) = self.peers.peer(&relay.id) { - addresses.extend(peer.addrs()) - } - } else { - // use only specific relay addresses - addresses.extend(relay.addrs.iter().cloned()) - } - - // no relay address => bail out - if addresses.is_empty() { - info!("no available address for relay: {:?}", relay.id); - return Err(self); - } - - let relay = relay.clone(); - let message = self.hop_message(destination); - let upgrade = protocol::Source(message); - let dialer = self.transport; - let future = stream::iter_ok(addresses.into_iter()) - .filter_map(move |addr| dialer.clone().dial(addr).ok()) - .and_then(move |dial| { - let upgrade = upgrade.clone(); - dial.map_err(|_| RelayError::Message("could not dial")) - .and_then(move |c| apply_outbound(c, upgrade).from_err()) - }) - .then(|result| Ok(result.ok())) - .filter_map(|result| result) - .into_future() - .map_err(|(err, _stream)| err) - .and_then(move |(ok, _stream)| match ok { - Some(out) => { - debug!("connected"); - Ok(out) - } - None => { - info!("failed to dial to {:?}", relay.id); - Err(RelayError::Message("failed to dial to relay")) - } - }); - Ok(future) - } - - fn hop_message(&self, destination: &Peer) -> CircuitRelay { - let mut msg = CircuitRelay::new(); - msg.set_field_type(CircuitRelay_Type::HOP); - - let mut from = CircuitRelay_Peer::new(); - from.set_id(self.my_id.as_bytes().to_vec()); - if let Some(me) = self.peers.peer(&self.my_id) { - for a in me.addrs() { - from.mut_addrs().push(a.to_bytes()) - } - } - msg.set_srcPeer(from); - - let mut dest = CircuitRelay_Peer::new(); - dest.set_id(destination.id.as_bytes().to_vec()); - for a in &destination.addrs { - dest.mut_addrs().push(a.to_bytes()) - } - msg.set_dstPeer(dest); - - msg - } -} diff --git a/transports/relay/src/utility.rs b/transports/relay/src/utility.rs deleted file mode 100644 index f78c17f0..00000000 --- a/transports/relay/src/utility.rs +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type}; -use futures::{future::{self, Either}, prelude::*}; -use log::trace; -use multiaddr::{Protocol, Multiaddr}; -use peerstore::PeerId; -use protobuf::{self, Message}; -use std::{io, error::Error, iter::FromIterator}; -use tokio_codec::Framed; -use tokio_io::{AsyncRead, AsyncWrite}; -use unsigned_varint::codec; - -pub(crate) fn is_success(msg: &CircuitRelay) -> bool { - msg.get_field_type() == CircuitRelay_Type::STATUS - && msg.get_code() == CircuitRelay_Status::SUCCESS -} - -pub(crate) fn status(s: CircuitRelay_Status) -> CircuitRelay { - let mut msg = CircuitRelay::new(); - msg.set_field_type(CircuitRelay_Type::STATUS); - msg.set_code(s); - msg -} - -pub(crate) struct Io { - codec: Framed>>, -} - -impl Io { - pub(crate) fn new(c: T) -> Io { - Io { - codec: Framed::new(c, codec::UviBytes::default()), - } - } - - pub(crate) fn into(self) -> T { - self.codec.into_inner() - } -} - -impl Io -where - T: AsyncRead + AsyncWrite + 'static, -{ - pub(crate) fn send(self, msg: CircuitRelay) -> impl Future { - trace!("sending protocol message: type={:?}, code={:?}", - msg.get_field_type(), - msg.get_code()); - let pkg = match msg.write_to_bytes() { - Ok(p) => p, - Err(e) => return Either::A(future::err(io_err(e))) - }; - Either::B(self.codec.send(pkg).map(|codec| Io { codec })) - } - - pub(crate) fn recv(self) -> impl Future, Self), Error=io::Error> { - self.codec - .into_future() - .map_err(|(e, _)| io_err(e)) - .and_then(|(pkg, codec)| { - if let Some(ref p) = pkg { - protobuf::parse_from_bytes(p) - .map(|msg: CircuitRelay| { - trace!("received protocol message: type={:?}, code={:?}", - msg.get_field_type(), - msg.get_code()); - (Some(msg), Io { codec }) - }) - .map_err(io_err) - } else { - Ok((None, Io { codec })) - } - }) - } -} - -pub(crate) enum RelayAddr { - Address { relay: Option, dest: Peer }, - Malformed, - Multihop, // Unsupported -} - -impl RelayAddr { - // Address format: []/p2p-circuit/ - pub(crate) fn parse(addr: &Multiaddr) -> RelayAddr { - let mut iter = addr.iter().peekable(); - - let relay = if let Some(&Protocol::P2pCircuit) = iter.peek() { - None // Address begins with "p2p-circuit", i.e. no relay is specified. - } else { - let prefix = iter.by_ref().take_while(|p| *p != Protocol::P2pCircuit); - match Peer::from(Multiaddr::from_iter(prefix)) { - None => return RelayAddr::Malformed, - peer => peer, - } - }; - - // After the (optional) relay, "p2p-circuit" is expected. - if Some(Protocol::P2pCircuit) != iter.next() { - return RelayAddr::Malformed; - } - - let dest = { - let suffix = iter.by_ref().take_while(|p| *p != Protocol::P2pCircuit); - match Peer::from(Multiaddr::from_iter(suffix)) { - None => return RelayAddr::Malformed, - Some(p) => p, - } - }; - - if iter.next().is_some() { - return RelayAddr::Multihop; - } - - RelayAddr::Address { relay, dest } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct Peer { - pub(crate) id: PeerId, - pub(crate) addrs: Vec, -} - -impl Peer { - pub(crate) fn from(mut addr: Multiaddr) -> Option { - match addr.pop() { - Some(Protocol::P2p(id)) => { - PeerId::from_multihash(id).ok().map(|pid| { - if addr.iter().count() == 0 { - Peer { - id: pid, - addrs: Vec::new(), - } - } else { - Peer { - id: pid, - addrs: vec![addr], - } - } - }) - } - _ => None, - } - } - - pub(crate) fn from_message(mut m: CircuitRelay_Peer) -> Option { - let pid = PeerId::from_bytes(m.take_id()).ok()?; - let mut addrs = Vec::new(); - for a in m.take_addrs().into_iter() { - if let Ok(ma) = Multiaddr::from_bytes(a) { - addrs.push(ma) - } - } - Some(Peer { id: pid, addrs }) - } -} - -pub(crate) fn io_err(e: E) -> io::Error -where - E: Into>, -{ - io::Error::new(io::ErrorKind::Other, e) -}