Remove relay, peerstore and datastore (#723)

This commit is contained in:
Pierre Krieger 2018-12-04 14:52:14 +01:00 committed by GitHub
parent 57ebe697b5
commit 9e0f110e47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 0 additions and 3530 deletions

View File

@ -18,11 +18,9 @@ libp2p-mplex = { path = "./muxers/mplex" }
libp2p-identify = { path = "./protocols/identify" }
libp2p-kad = { path = "./protocols/kad" }
libp2p-floodsub = { path = "./protocols/floodsub" }
libp2p-peerstore = { path = "./stores/peerstore" }
libp2p-ping = { path = "./protocols/ping" }
libp2p-plaintext = { path = "./protocols/plaintext" }
libp2p-ratelimit = { path = "./transports/ratelimit" }
libp2p-relay = { path = "./transports/relay" }
libp2p-core = { path = "./core" }
libp2p-core-derive = { path = "./misc/core-derive" }
libp2p-secio = { path = "./protocols/secio", default-features = false }
@ -67,11 +65,8 @@ members = [
"protocols/ping",
"protocols/plaintext",
"protocols/secio",
"stores/datastore",
"stores/peerstore",
"transports/dns",
"transports/ratelimit",
"transports/relay",
"transports/tcp",
"transports/timeout",
"transports/uds",

View File

@ -8,7 +8,6 @@ license = "MIT"
bytes = "0.4"
fnv = "1"
futures = "0.1"
libp2p-peerstore = { path = "../../stores/peerstore" }
libp2p-core = { path = "../../core" }
log = "0.4.1"
multiaddr = { path = "../../misc/multiaddr" }

View File

@ -69,7 +69,6 @@ extern crate bytes;
extern crate fnv;
#[macro_use]
extern crate futures;
extern crate libp2p_peerstore;
extern crate libp2p_core;
extern crate log;
extern crate multiaddr;

View File

@ -9,7 +9,6 @@ arrayvec = "0.4.7"
bs58 = "0.2.0"
bigint = "4.2"
bytes = "0.4"
datastore = { path = "../../stores/datastore" }
fnv = "1.0"
futures = "0.1"
libp2p-identify = { path = "../../protocols/identify" }

View File

@ -60,7 +60,6 @@ extern crate arrayvec;
extern crate bigint;
extern crate bs58;
extern crate bytes;
extern crate datastore;
extern crate fnv;
#[cfg_attr(test, macro_use)]
extern crate futures;

View File

@ -148,11 +148,9 @@ pub extern crate libp2p_floodsub as floodsub;
pub extern crate libp2p_mplex as mplex;
#[cfg(not(target_os = "emscripten"))]
pub extern crate libp2p_mdns as mdns;
pub extern crate libp2p_peerstore as peerstore;
pub extern crate libp2p_ping as ping;
pub extern crate libp2p_plaintext as plaintext;
pub extern crate libp2p_ratelimit as ratelimit;
pub extern crate libp2p_relay as relay;
pub extern crate libp2p_secio as secio;
#[cfg(not(target_os = "emscripten"))]
pub extern crate libp2p_tcp_transport as tcp;

View File

@ -1,13 +0,0 @@
[package]
name = "datastore"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
[dependencies]
base64 = "0.7"
chashmap = "2.2"
futures = "0.1"
serde = "1.0"
serde_json = "1.0"
tempfile = "3"

View File

@ -1,336 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of `Datastore` that uses a single plain JSON file for storage.
use Datastore;
use chashmap::{CHashMap, WriteGuard};
use futures::Future;
use futures::stream::{iter_ok, Stream};
use query::{naive_apply_query, Query};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::value::Value;
use serde_json::{from_reader, from_value, to_value, to_writer, Map};
use std::borrow::Cow;
use std::fs;
use std::io::Cursor;
use std::io::Error as IoError;
use std::io::ErrorKind as IoErrorKind;
use std::io::Read;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use tempfile::NamedTempFile;
/// Implementation of `Datastore` that uses a single plain JSON file.
pub struct JsonFileDatastore<T>
where
T: Serialize + DeserializeOwned + Clone,
{
path: PathBuf,
content: CHashMap<String, T>,
}
impl<T> JsonFileDatastore<T>
where
T: Serialize + DeserializeOwned + Clone,
{
/// Opens or creates the datastore. If the path refers to an existing path, then this function
/// will attempt to load an existing set of values from it (which can result in an error).
/// Otherwise if the path doesn't exist, the parent directory will be created and a new empty
/// datastore will be returned.
pub fn new<P>(path: P) -> Result<JsonFileDatastore<T>, IoError>
where
P: Into<PathBuf>,
{
let path = path.into();
if !path.exists() {
fs::create_dir_all(path.parent()
.expect("can only fail if root, which means that path.exists() would be true"))?;
return Ok(JsonFileDatastore {
path: path,
content: CHashMap::new(),
});
}
let content = {
let mut file = fs::File::open(&path)?;
// We want to support empty files (and treat them as an empty recordset). Unfortunately
// `serde_json` will always produce an error if we do this ("unexpected EOF at line 0
// column 0"). Therefore we start by reading one byte from the file in order to check
// for EOF.
let mut first_byte = [0];
if file.read(&mut first_byte)? == 0 {
// File is empty.
CHashMap::new()
} else {
match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) {
Ok(Value::Null) => CHashMap::new(),
Ok(Value::Object(map)) => {
let mut out = CHashMap::with_capacity(map.len());
for (key, value) in map.into_iter() {
let value = match from_value(value) {
Ok(v) => v,
Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)),
};
out.insert(key, value);
}
out
}
Ok(_) => {
return Err(IoError::new(
IoErrorKind::InvalidData,
"expected JSON object",
));
}
Err(err) => {
return Err(IoError::new(IoErrorKind::InvalidData, err));
}
}
}
};
Ok(JsonFileDatastore {
path: path,
content: content,
})
}
/// Flushes the content of the datastore to the disk.
///
/// This function can only fail in case of a disk access error. If an error occurs, any change
/// to the datastore that was performed since the last successful flush will be lost. No data
/// will be corrupted.
pub fn flush(&self) -> Result<(), IoError>
where
T: Clone,
{
// Create a temporary file in the same directory as the destination, which avoids the
// problem of having a file cleaner delete our file while we use it.
let self_path_parent = self.path.parent().ok_or(IoError::new(
IoErrorKind::Other,
"couldn't get parent directory of destination",
))?;
let mut temporary_file = NamedTempFile::new_in(self_path_parent)?;
let content = self.content.clone().into_iter();
to_writer(
&mut temporary_file,
&content
.map(|(k, v)| (k, to_value(v).unwrap()))
.collect::<Map<_, _>>(),
)?;
temporary_file.as_file().sync_data()?;
// Note that `persist` will fail if we try to persist across filesystems. However that
// shouldn't happen since we created the temporary file in the same directory as the final
// path.
temporary_file.persist(&self.path)?;
Ok(())
}
}
impl<'a, T> Datastore<T> for &'a JsonFileDatastore<T>
where
T: Clone + Serialize + DeserializeOwned + Default + PartialOrd + 'static,
{
type Entry = JsonFileDatastoreEntry<'a, T>;
type QueryResult = Box<Stream<Item = (String, T), Error = IoError> + 'a>;
#[inline]
fn lock(self, key: Cow<str>) -> Option<Self::Entry> {
self.content
.get_mut(&key.into_owned())
.map(JsonFileDatastoreEntry)
}
#[inline]
fn lock_or_create(self, key: Cow<str>) -> Self::Entry {
loop {
self.content
.upsert(key.clone().into_owned(), || Default::default(), |_| {});
// There is a slight possibility that another thread will delete our value in this
// small interval. If this happens, we just loop and reinsert the value again until
// we can acquire a lock.
if let Some(v) = self.content.get_mut(&key.clone().into_owned()) {
return JsonFileDatastoreEntry(v);
}
}
}
#[inline]
fn put(self, key: Cow<str>, value: T) {
self.content.insert(key.into_owned(), value);
}
#[inline]
fn get(self, key: &str) -> Option<T> {
self.content.get(&key.to_owned()).map(|v| v.clone())
}
#[inline]
fn has(self, key: &str) -> bool {
self.content.contains_key(&key.to_owned())
}
#[inline]
fn delete(self, key: &str) -> Option<T> {
self.content.remove(&key.to_owned())
}
fn query(self, query: Query<T>) -> Self::QueryResult {
let content = self.content.clone();
let keys_only = query.keys_only;
let content_stream = iter_ok(content.into_iter().filter_map(|(key, value)| {
// Skip values that are malformed.
let value = if keys_only { Default::default() } else { value };
Some((key, value))
}));
// `content_stream` reads from the content of the `Mutex`, so we need to clone the data
// into a `Vec` before returning.
let collected = naive_apply_query(content_stream, query)
.collect()
.wait()
.expect(
"can only fail if either `naive_apply_query` or `content_stream` produce \
an error, which cann't happen",
);
let output_stream = iter_ok(collected.into_iter());
Box::new(output_stream) as Box<_>
}
}
impl<T> Drop for JsonFileDatastore<T>
where
T: Serialize + DeserializeOwned + Clone,
{
#[inline]
fn drop(&mut self) {
// Unfortunately there's not much we can do here in case of an error, as panicking would be
// very bad. Similar to `File`, the user should take care to call `flush()` before dropping
// the datastore.
//
// If an error happens here, any change since the last successful flush will be lost, but
// the data will not be corrupted.
let _ = self.flush();
}
}
/// Implementation of `Datastore` that uses a single plain JSON file.
pub struct JsonFileDatastoreEntry<'a, T>(WriteGuard<'a, String, T>)
where
T: 'a;
impl<'a, T> Deref for JsonFileDatastoreEntry<'a, T>
where
T: 'a,
{
type Target = T;
fn deref(&self) -> &T {
&*self.0
}
}
impl<'a, T> DerefMut for JsonFileDatastoreEntry<'a, T>
where
T: 'a,
{
fn deref_mut(&mut self) -> &mut T {
&mut *self.0
}
}
#[cfg(test)]
mod tests {
use Datastore;
use JsonFileDatastore;
use futures::{Future, Stream};
use tempfile::NamedTempFile;
use {Filter, FilterOp, FilterTy, Order, Query};
#[test]
fn open_and_flush() {
let path = NamedTempFile::new().unwrap().into_temp_path();
let datastore = JsonFileDatastore::<Vec<u8>>::new(&path).unwrap();
datastore.flush().unwrap();
}
#[test]
fn values_store_and_reload() {
let path = NamedTempFile::new().unwrap().into_temp_path();
let datastore = JsonFileDatastore::<Vec<u8>>::new(&path).unwrap();
datastore.put("foo".into(), vec![1, 2, 3]);
datastore.put("bar".into(), vec![0, 255, 127]);
datastore.flush().unwrap();
drop(datastore);
let reload = JsonFileDatastore::<Vec<u8>>::new(&path).unwrap();
assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]);
assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]);
}
#[test]
fn query_basic() {
let path = NamedTempFile::new().unwrap().into_temp_path();
let datastore = JsonFileDatastore::<Vec<u8>>::new(&path).unwrap();
datastore.put("foo1".into(), vec![6, 7, 8]);
datastore.put("foo2".into(), vec![6, 7, 8]);
datastore.put("foo3".into(), vec![7, 8, 9]);
datastore.put("foo4".into(), vec![10, 11, 12]);
datastore.put("foo5".into(), vec![13, 14, 15]);
datastore.put("bar1".into(), vec![0, 255, 127]);
datastore.flush().unwrap();
let query = datastore
.query(Query {
prefix: "fo".into(),
filters: vec![
Filter {
ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()),
operation: FilterOp::NotEqual,
},
],
orders: vec![Order::ByKeyDesc],
skip: 1,
limit: u64::max_value(),
keys_only: false,
})
.collect()
.wait()
.unwrap();
assert_eq!(query[0].0, "foo4");
assert_eq!(query[0].1, &[10, 11, 12]);
assert_eq!(query[1].0, "foo3");
assert_eq!(query[1].1, &[7, 8, 9]);
}
}

View File

@ -1,177 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! General-purpose key-value storage.
//! The keys are strings, and the values are of any type you want.
//!
//! > **Note**: This crate is meant to be a utility for the implementation of other crates; it
//! > does not directly participate in the stack of libp2p.
//!
//! This crate provides the `Datastore` trait, whose template parameter is the type of the value.
//! It is implemented on types that represent a key-value storage.
//! The only available implementation for now is `JsonFileDatastore`.
//!
//! # JSON file datastore
//!
//! The `JsonFileDatastore` can provide a key-value storage that loads and stores data in a single
//! JSON file. It is only available if the value implements the `Serialize`, `DeserializeOwned`
//! and `Clone` traits.
//!
//! The `JsonFileDatastore::new` method will attempt to load existing data from the path you pass
//! as parameter. This path is also where the data will be stored. The content of the store is
//! flushed on drop or if you call `flush()`.
//!
//! ```no_run
//! use datastore::Datastore;
//! use datastore::JsonFileDatastore;
//!
//! let datastore = JsonFileDatastore::<Vec<u8>>::new("/tmp/test.json").unwrap();
//! datastore.put("foo".into(), vec![1, 2, 3]);
//! datastore.put("bar".into(), vec![0, 255, 127]);
//! assert_eq!(datastore.get("foo").unwrap(), &[1, 2, 3]);
//! datastore.flush().unwrap(); // optional
//! ```
//!
//! # Query
//!
//! In addition to simple operations such as `get` or `put`, the `Datastore` trait also provides
//! a way to perform queries on the key-value storage, using the `query` method.
//!
//! The struct returned by the `query` method implements the `Stream` trait from `futures`,
//! meaning that the result is asynchronous.
//!
//! > **Note**: For now the API of the `get` and `has` methods makes them potentially blocking
//! > operations, though the only available implementation doesn't block. The API of these
//! > methods may become asynchronous in the future if deemed necessary.
//!
//! ```no_run
//! extern crate datastore;
//! extern crate futures;
//!
//! # fn main() {
//! use datastore::{Query, Order, Filter, FilterTy, FilterOp};
//! use datastore::Datastore;
//! use datastore::JsonFileDatastore;
//! use futures::{Future, Stream};
//!
//! let datastore = JsonFileDatastore::<Vec<u8>>::new("/tmp/test.json").unwrap();
//! let query = datastore.query(Query {
//! // Only return the keys that start with this prefix.
//! prefix: "fo".into(),
//! // List of filters for the keys and/or values.
//! filters: vec![
//! Filter {
//! ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()),
//! operation: FilterOp::NotEqual,
//! },
//! ],
//! // Order in which to sort the results.
//! orders: vec![Order::ByKeyDesc],
//! // Number of entries to skip at the beginning of the results (after sorting).
//! skip: 1,
//! // Limit to the number of entries to return (use `u64::max_value()` for no limit).
//! limit: 12,
//! // If true, don't load the values. For optimization purposes.
//! keys_only: false,
//! });
//!
//! let results = query.collect().wait().unwrap();
//! println!("{:?}", results);
//! # }
//! ```
extern crate base64;
extern crate chashmap;
#[macro_use]
extern crate futures;
extern crate serde;
extern crate serde_json;
extern crate tempfile;
use futures::Stream;
use std::borrow::Cow;
use std::io::Error as IoError;
use std::ops::DerefMut;
mod json_file;
mod query;
pub use self::json_file::{JsonFileDatastore, JsonFileDatastoreEntry};
pub use self::query::{Filter, FilterOp, FilterTy, Order, Query};
/// Abstraction over any struct that can store `(key, value)` pairs.
pub trait Datastore<T> {
/// Locked entry.
type Entry: DerefMut<Target = T>;
/// Output of a query.
type QueryResult: Stream<Item = (String, T), Error = IoError>;
/// Sets the value of a key.
#[inline]
fn put(self, key: Cow<str>, value: T)
where
Self: Sized,
{
*self.lock_or_create(key) = value;
}
/// Checks if an entry exists, and if so locks it.
///
/// Trying to lock a value that is already locked will block, therefore you should keep locks
/// for a duration that is as short as possible.
fn lock(self, key: Cow<str>) -> Option<Self::Entry>;
/// Locks an entry if it exists, or creates it otherwise.
///
/// Same as `put` followed with `lock`, except that it is atomic.
fn lock_or_create(self, key: Cow<str>) -> Self::Entry;
/// Returns the value corresponding to this key by cloning it.
#[inline]
fn get(self, key: &str) -> Option<T>
where
Self: Sized,
T: Clone,
{
self.lock(key.into()).map(|v| v.clone())
}
/// Returns true if the datastore contains the given key.
///
/// > **Note**: Keep in mind that using this operation is probably racy. A secondary thread
/// > can delete a key right after you called `has()`. In other words, this function
/// > returns whether an entry with that key existed in the short past.
#[inline]
fn has(self, key: &str) -> bool
where
Self: Sized,
{
self.lock(key.into()).is_some()
}
/// Removes the given key from the datastore. Returns the old value if the key existed.
fn delete(self, key: &str) -> Option<T>;
/// Executes a query on the key-value store.
///
/// This operation is expensive on some implementations and cheap on others. It is your
/// responsibility to pick the right implementation for the right job.
fn query(self, query: Query<T>) -> Self::QueryResult;
}

View File

@ -1,350 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::stream::{iter_ok, Skip as StreamSkip, Take as StreamTake};
use futures::{Async, Future, Poll, Stream};
use std::borrow::Cow;
use std::cmp::Ordering;
use std::io::Error as IoError;
use std::marker::PhantomData;
use std::vec::IntoIter as VecIntoIter;
/// Description of a query to apply on a datastore.
///
/// The various modifications of the dataset are applied in the same order as the fields (prefix,
/// filters, orders, skip, limit).
#[derive(Debug, Clone)]
pub struct Query<'a, T: 'a> {
/// Only the keys that start with `prefix` will be returned.
pub prefix: Cow<'a, str>,
/// Filters to apply on the results.
pub filters: Vec<Filter<'a, T>>,
/// How to order the keys. Applied sequentially.
pub orders: Vec<Order>,
/// Number of elements to skip from at the start of the results.
pub skip: u64,
/// Maximum number of elements in the results.
pub limit: u64,
/// Only return keys. If true, then all the `Vec`s of the data will be empty.
pub keys_only: bool,
}
/// A filter to apply to the results set.
#[derive(Debug, Clone)]
pub struct Filter<'a, T: 'a> {
/// Type of filter and value to compare with.
pub ty: FilterTy<'a, T>,
/// Comparison operation.
pub operation: FilterOp,
}
/// Type of filter and value to compare with.
#[derive(Debug, Clone)]
pub enum FilterTy<'a, T: 'a> {
/// Compare the key with a reference value.
KeyCompare(Cow<'a, str>),
/// Compare the value with a reference value.
ValueCompare(&'a T),
}
/// Filtering operation.
#[derive(Debug, Copy, Clone)]
pub enum FilterOp {
Equal,
NotEqual,
Less,
LessOrEqual,
Greater,
GreaterOrEqual,
}
/// Order in which to sort the results of a query.
#[derive(Debug, Copy, Clone)]
pub enum Order {
/// Put the values in ascending order.
ByValueAsc,
/// Put the values in descending order.
ByValueDesc,
/// Put the keys in ascending order.
ByKeyAsc,
/// Put the keys in descending order.
ByKeyDesc,
}
/// Naively applies a query on a set of results.
pub fn naive_apply_query<'a, S, V>(
stream: S,
query: Query<'a, V>,
) -> StreamTake<
StreamSkip<
NaiveKeysOnlyApply<
NaiveApplyOrdered<
NaiveFiltersApply<'a, NaivePrefixApply<'a, S>, VecIntoIter<Filter<'a, V>>>,
V,
>,
>,
>,
>
where
S: Stream<Item = (String, V), Error = IoError> + 'a,
V: Clone + PartialOrd + Default + 'static,
{
let prefixed = naive_apply_prefix(stream, query.prefix);
let filtered = naive_apply_filters(prefixed, query.filters.into_iter());
let ordered = naive_apply_ordered(filtered, query.orders);
let keys_only = naive_apply_keys_only(ordered, query.keys_only);
naive_apply_skip_limit(keys_only, query.skip, query.limit)
}
/// Skips the `skip` first element of a stream and only returns `limit` elements.
#[inline]
pub fn naive_apply_skip_limit<S, T>(stream: S, skip: u64, limit: u64) -> StreamTake<StreamSkip<S>>
where
S: Stream<Item = (String, T), Error = IoError>,
{
stream.skip(skip).take(limit)
}
/// Filters the result of a stream to empty values if `keys_only` is true.
#[inline]
pub fn naive_apply_keys_only<S, T>(stream: S, keys_only: bool) -> NaiveKeysOnlyApply<S>
where
S: Stream<Item = (String, T), Error = IoError>,
{
NaiveKeysOnlyApply {
keys_only: keys_only,
stream: stream,
}
}
/// Returned by `naive_apply_keys_only`.
#[derive(Debug, Clone)]
pub struct NaiveKeysOnlyApply<S> {
keys_only: bool,
stream: S,
}
impl<S, T> Stream for NaiveKeysOnlyApply<S>
where
S: Stream<Item = (String, T), Error = IoError>,
T: Default,
{
type Item = (String, T);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.keys_only {
Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| {
v.1 = Default::default();
v
})))
} else {
self.stream.poll()
}
}
}
/// Filters the result of a stream to only keep the results with a prefix.
#[inline]
pub fn naive_apply_prefix<'a, S, T>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S>
where
S: Stream<Item = (String, T), Error = IoError>,
{
NaivePrefixApply {
prefix: prefix,
stream: stream,
}
}
/// Returned by `naive_apply_prefix`.
#[derive(Debug, Clone)]
pub struct NaivePrefixApply<'a, S> {
prefix: Cow<'a, str>,
stream: S,
}
impl<'a, S, T> Stream for NaivePrefixApply<'a, S>
where
S: Stream<Item = (String, T), Error = IoError>,
{
type Item = (String, T);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let item = try_ready!(self.stream.poll());
match item {
Some(i) => {
if i.0.starts_with(&*self.prefix) {
return Ok(Async::Ready(Some(i)));
}
}
None => return Ok(Async::Ready(None)),
}
}
}
}
/// Applies orderings on the stream data. Will simply pass data through if the list of orderings
/// is empty. Otherwise will need to collect.
pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V>
where
S: Stream<Item = (String, V), Error = IoError> + 'a,
I: IntoIterator<Item = Order>,
I::IntoIter: 'a,
V: PartialOrd + 'static,
{
let orders_iter = orders_iter.into_iter();
if orders_iter.size_hint().1 == Some(0) {
return NaiveApplyOrdered {
inner: NaiveApplyOrderedInner::PassThrough(stream),
};
}
let collected = stream
.collect()
.and_then(move |mut collected| {
for order in orders_iter {
match order {
Order::ByValueAsc => {
collected.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
}
Order::ByValueDesc => {
collected.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal));
}
Order::ByKeyAsc => {
collected.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
}
Order::ByKeyDesc => {
collected.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal));
}
}
}
Ok(iter_ok(collected.into_iter()))
})
.flatten_stream();
NaiveApplyOrdered {
inner: NaiveApplyOrderedInner::Collected(Box::new(collected)),
}
}
/// Returned by `naive_apply_ordered`.
pub struct NaiveApplyOrdered<'a, S, T> {
inner: NaiveApplyOrderedInner<'a, S, T>,
}
enum NaiveApplyOrderedInner<'a, S, T> {
PassThrough(S),
Collected(Box<Stream<Item = (String, T), Error = IoError> + 'a>),
}
impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V>
where
S: Stream<Item = (String, V), Error = IoError>,
{
type Item = (String, V);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner {
NaiveApplyOrderedInner::PassThrough(ref mut s) => s.poll(),
NaiveApplyOrderedInner::Collected(ref mut s) => s.poll(),
}
}
}
/// Filters the result of a stream to apply a set of filters.
#[inline]
pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I>
where
S: Stream<Item = (String, V), Error = IoError>,
I: Iterator<Item = Filter<'a, V>> + Clone,
V: 'a,
{
NaiveFiltersApply {
filters: filters,
stream: stream,
marker: PhantomData,
}
}
/// Returned by `naive_apply_prefix`.
#[derive(Debug, Clone)]
pub struct NaiveFiltersApply<'a, S, I> {
filters: I,
stream: S,
marker: PhantomData<&'a ()>,
}
impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I>
where
S: Stream<Item = (String, T), Error = IoError>,
I: Iterator<Item = Filter<'a, T>> + Clone,
T: PartialOrd + 'a,
{
type Item = (String, T);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
'outer: loop {
let item = try_ready!(self.stream.poll());
match item {
Some(i) => {
for filter in self.filters.clone() {
if !naive_filter_test(&i, &filter) {
continue 'outer;
}
}
return Ok(Async::Ready(Some(i)));
}
None => return Ok(Async::Ready(None)),
}
}
}
}
#[inline]
fn naive_filter_test<T>(entry: &(String, T), filter: &Filter<T>) -> bool
where
T: PartialOrd,
{
let (expected_ordering, revert_expected) = match filter.operation {
FilterOp::Equal => (Ordering::Equal, false),
FilterOp::NotEqual => (Ordering::Equal, true),
FilterOp::Less => (Ordering::Less, false),
FilterOp::GreaterOrEqual => (Ordering::Less, true),
FilterOp::Greater => (Ordering::Greater, false),
FilterOp::LessOrEqual => (Ordering::Greater, true),
};
match filter.ty {
FilterTy::KeyCompare(ref ref_value) => {
((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected
}
FilterTy::ValueCompare(ref ref_value) => {
(entry.1.partial_cmp(&**ref_value) == Some(expected_ordering)) != revert_expected
}
}
}

View File

@ -1,20 +0,0 @@
[package]
name = "libp2p-peerstore"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
[dependencies]
bs58 = "0.2.0"
datastore = { path = "../../stores/datastore" }
futures = "0.1.0"
owning_ref = "0.3.3"
libp2p-core = { path = "../../core" }
multiaddr = { path = "../../misc/multiaddr" }
serde = "1.0.70"
serde_derive = "1.0.70"
[dev-dependencies]
tempfile = "2.2"
serde_json = "1.0"
multihash = { path = "../../misc/multihash" }

View File

@ -1,171 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the `Peerstore` trait that uses a single JSON file as backend.
use super::TTL;
use bs58;
use datastore::{Datastore, JsonFileDatastore, JsonFileDatastoreEntry, Query};
use futures::{Future, Stream};
use multiaddr::Multiaddr;
use peer_info::{AddAddrBehaviour, PeerInfo};
use peerstore::{PeerAccess, Peerstore};
use std::io::Error as IoError;
use std::iter;
use std::path::PathBuf;
use std::vec::IntoIter as VecIntoIter;
use PeerId;
/// Peerstore backend that uses a Json file.
pub struct JsonPeerstore {
store: JsonFileDatastore<PeerInfo>,
}
impl JsonPeerstore {
/// Opens a new peerstore tied to a JSON file at the given path.
///
/// If the file exists, this function will open it. In any case, flushing the peerstore or
/// destroying it will write to the file.
#[inline]
pub fn new<P>(path: P) -> Result<JsonPeerstore, IoError>
where
P: Into<PathBuf>,
{
Ok(JsonPeerstore {
store: JsonFileDatastore::new(path)?,
})
}
/// Flushes the content of the peer store to the disk.
///
/// This function can only fail in case of a disk access error. If an error occurs, any change
/// to the peerstore that was performed since the last successful flush will be lost. No data
/// will be corrupted.
#[inline]
pub fn flush(&self) -> Result<(), IoError> {
self.store.flush()
}
}
impl<'a> Peerstore for &'a JsonPeerstore {
type PeerAccess = JsonPeerstoreAccess<'a>;
type PeersIter = Box<Iterator<Item = PeerId>>;
#[inline]
fn peer(self, peer_id: &PeerId) -> Option<Self::PeerAccess> {
let hash = bs58::encode(peer_id.as_bytes()).into_string();
self.store.lock(hash.into()).map(JsonPeerstoreAccess)
}
#[inline]
fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess {
let hash = bs58::encode(peer_id.as_bytes()).into_string();
JsonPeerstoreAccess(self.store.lock_or_create(hash.into()))
}
fn peers(self) -> Self::PeersIter {
let query = self.store.query(Query {
prefix: "".into(),
filters: vec![],
orders: vec![],
skip: 0,
limit: u64::max_value(),
keys_only: false,
});
let list = query
.filter_map(|(key, info)| {
if info.addrs().count() == 0 {
return None // all addresses are expired
}
// We filter out invalid elements. This can happen if the JSON storage file was
// corrupted or manually modified by the user.
PeerId::from_bytes(bs58::decode(key).into_vec().ok()?).ok()
})
.collect()
.wait(); // Wait can never block for the JSON datastore.
// Need to handle I/O errors. Again we just ignore.
if let Ok(list) = list {
Box::new(list.into_iter()) as Box<_>
} else {
Box::new(iter::empty()) as Box<_>
}
}
}
pub struct JsonPeerstoreAccess<'a>(JsonFileDatastoreEntry<'a, PeerInfo>);
impl<'a> PeerAccess for JsonPeerstoreAccess<'a> {
type AddrsIter = VecIntoIter<Multiaddr>;
#[inline]
fn addrs(&self) -> Self::AddrsIter {
self.0.addrs().cloned().collect::<Vec<_>>().into_iter()
}
#[inline]
fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) {
self.0
.add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior);
}
#[inline]
fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) {
self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl);
}
#[inline]
fn clear_addrs(&mut self) {
self.0.set_addrs(iter::empty());
}
}
#[cfg(test)]
mod tests {
extern crate tempfile;
peerstore_tests!(
{::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap()}
{let temp_file = self::tempfile::NamedTempFile::new().unwrap()}
);
#[test]
fn reload() {
let temp_file = self::tempfile::NamedTempFile::new().unwrap();
let peer_store = ::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap();
let peer_id = PeerId::random();
let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store
.peer_or_create(&peer_id)
.add_addr(addr.clone(), Duration::from_millis(5000));
peer_store.flush().unwrap();
drop(peer_store);
let peer_store = ::json_peerstore::JsonPeerstore::new(temp_file.path()).unwrap();
let addrs = peer_store
.peer(&peer_id)
.unwrap()
.addrs()
.collect::<Vec<_>>();
assert_eq!(addrs, &[addr]);
}
}

View File

@ -1,94 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! The `peerstore` crate allows one to store information about a peer.
//!
//! `peerstore` is a key-value database, where the keys are multihashes (which usually corresponds
//! to the hash of the public key of the peer, but that is not enforced by this crate) and the
//! values are the public key and a list of multiaddresses. Additionally, the multiaddresses stored
//! by the `peerstore` have a time-to-live after which they disappear.
//!
//! This crate consists of a generic `Peerstore` trait and the follow implementations:
//!
//! - `JsonPeerstore`: Stores the information in a single JSON file.
//! - `MemoryPeerstore`: Stores the information in memory.
//!
//! Note that the peerstore implementations do not consider information inside a peer store to be
//! critical. In case of an error (e.g. corrupted file, disk error, etc.) they will prefer to lose
//! data rather than returning the error.
//!
//! # Example
//!
//! ```
//! extern crate multiaddr;
//! extern crate libp2p_core;
//! extern crate libp2p_peerstore;
//!
//! # fn main() {
//! use libp2p_core::{PeerId, PublicKey};
//! use libp2p_peerstore::memory_peerstore::MemoryPeerstore;
//! use libp2p_peerstore::{Peerstore, PeerAccess};
//! use multiaddr::Multiaddr;
//! use std::time::Duration;
//!
//! // In this example we use a `MemoryPeerstore`, but you can easily swap it for another backend.
//! let mut peerstore = MemoryPeerstore::empty();
//! let peer_id = PeerId::random();
//!
//! // Let's write some information about a peer.
//! {
//! // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope.
//! let mut peer = peerstore.peer_or_create(&peer_id);
//! peer.add_addr("/ip4/10.11.12.13/tcp/20000".parse::<Multiaddr>().unwrap(),
//! Duration::from_millis(5000));
//! }
//!
//! // Now let's load back the info.
//! {
//! let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore");
//! assert_eq!(peer.addrs().collect::<Vec<_>>(),
//! &["/ip4/10.11.12.13/tcp/20000".parse::<Multiaddr>().unwrap()]);
//! }
//! # }
//! ```
extern crate bs58;
extern crate datastore;
extern crate futures;
extern crate libp2p_core;
extern crate multiaddr;
extern crate owning_ref;
extern crate serde;
#[macro_use]
extern crate serde_derive;
// TODO: remove
pub use self::libp2p_core::PeerId;
pub use self::peerstore::{PeerAccess, Peerstore};
#[macro_use]
mod peerstore_tests;
pub mod json_peerstore;
pub mod memory_peerstore;
mod peer_info;
mod peerstore;
pub type TTL = std::time::Duration;

View File

@ -1,123 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the `Peerstore` trait that simple stores peers in memory.
use super::TTL;
use multiaddr::Multiaddr;
use owning_ref::OwningRefMut;
use peer_info::{AddAddrBehaviour, PeerInfo};
use peerstore::{PeerAccess, Peerstore};
use std::collections::HashMap;
use std::iter;
use std::sync::{Mutex, MutexGuard};
use std::vec::IntoIter as VecIntoIter;
use PeerId;
/// Implementation of the `Peerstore` trait that simply stores the peer information in memory.
#[derive(Debug)]
pub struct MemoryPeerstore {
store: Mutex<HashMap<PeerId, PeerInfo>>,
}
impl MemoryPeerstore {
/// Initializes a new `MemoryPeerstore`. The database is initially empty.
#[inline]
pub fn empty() -> MemoryPeerstore {
MemoryPeerstore {
store: Mutex::new(HashMap::new()),
}
}
}
impl Default for MemoryPeerstore {
#[inline]
fn default() -> MemoryPeerstore {
MemoryPeerstore::empty()
}
}
impl<'a> Peerstore for &'a MemoryPeerstore {
type PeerAccess = MemoryPeerstoreAccess<'a>;
type PeersIter = VecIntoIter<PeerId>;
fn peer(self, peer_id: &PeerId) -> Option<Self::PeerAccess> {
let lock = self.store.lock().unwrap();
OwningRefMut::new(lock)
.try_map_mut(|n| n.get_mut(peer_id).ok_or(()))
.ok()
.map(MemoryPeerstoreAccess)
}
fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess {
let lock = self.store.lock().unwrap();
let r = OwningRefMut::new(lock)
.map_mut(|n| n.entry(peer_id.clone()).or_insert_with(|| PeerInfo::new()));
MemoryPeerstoreAccess(r)
}
fn peers(self) -> Self::PeersIter {
let lock = self.store.lock().unwrap();
lock.iter()
.filter_map(|(id, info)| {
if info.addrs().count() == 0 {
return None // all addresses are expired
}
Some(id.clone())
})
.collect::<Vec<_>>().into_iter()
}
}
// Note: Rust doesn't provide a `MutexGuard::map` method, otherwise we could directly store a
// `MutexGuard<'a, (&'a PeerId, &'a PeerInfo)>`.
pub struct MemoryPeerstoreAccess<'a>(
OwningRefMut<MutexGuard<'a, HashMap<PeerId, PeerInfo>>, PeerInfo>,
);
impl<'a> PeerAccess for MemoryPeerstoreAccess<'a> {
type AddrsIter = VecIntoIter<Multiaddr>;
#[inline]
fn addrs(&self) -> Self::AddrsIter {
self.0.addrs().cloned().collect::<Vec<_>>().into_iter()
}
#[inline]
fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) {
self.0
.add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior);
}
#[inline]
fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) {
self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl);
}
#[inline]
fn clear_addrs(&mut self) {
self.0.set_addrs(iter::empty());
}
}
#[cfg(test)]
mod tests {
peerstore_tests!({ ::memory_peerstore::MemoryPeerstore::empty() });
}

View File

@ -1,139 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! The objective of the `peerstore` crate is to provide a key-value storage. Keys are peer IDs,
//! and the `PeerInfo` struct in this module is the value.
//!
//! Note that the `PeerInfo` struct implements `PartialOrd` in a dummy way so that it can be stored
//! in a `Datastore`.
//! If the `PeerInfo` struct ever gets exposed to the public API of the crate, we may want to give
//! more thoughts about this.
use multiaddr::Multiaddr;
use std::cmp::Ordering;
use std::time::SystemTime;
use TTL;
/// Information about a peer.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerInfo {
// Adresses, and the time at which they will be considered expired.
addrs: Vec<(Multiaddr, SystemTime)>,
}
impl PeerInfo {
/// Builds a new empty `PeerInfo`.
#[inline]
pub fn new() -> PeerInfo {
PeerInfo { addrs: vec![] }
}
/// Returns the list of the non-expired addresses stored in this `PeerInfo`.
///
/// > **Note**: Keep in mind that this function is racy because addresses can expire between
/// > the moment when you get them and the moment when you process them.
#[inline]
pub fn addrs<'a>(&'a self) -> impl Iterator<Item = &'a Multiaddr> + 'a {
let now = SystemTime::now();
self.addrs.iter().filter_map(move |(addr, expires)| {
if *expires >= now {
Some(addr)
} else {
None
}
})
}
/// Sets the list of addresses and their time-to-live.
///
/// This removes all previously-stored addresses and replaces them with new ones.
#[inline]
pub fn set_addrs<I>(&mut self, addrs: I)
where
I: IntoIterator<Item = (Multiaddr, TTL)>,
{
let now = SystemTime::now();
self.addrs = addrs
.into_iter()
.map(move |(addr, ttl)| (addr, now + ttl))
.collect();
}
/// Adds a single address and its time-to-live.
///
/// If the peer info already knows about that address, then what happens depends on the
/// `behaviour` parameter.
pub fn add_addr(&mut self, addr: Multiaddr, ttl: TTL, behaviour: AddAddrBehaviour) {
let expires = SystemTime::now() + ttl;
if let Some(&mut (_, ref mut existing_expires)) =
self.addrs.iter_mut().find(|&&mut (ref a, _)| a == &addr)
{
if behaviour == AddAddrBehaviour::OverwriteTtl || *existing_expires < expires {
*existing_expires = expires;
}
return;
}
self.addrs.push((addr, expires));
}
}
/// Behaviour of the `add_addr` function.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AddAddrBehaviour {
/// Always overwrite the existing TTL.
OverwriteTtl,
/// Don't overwrite if the TTL is larger.
IgnoreTtlIfInferior,
}
// The reason why we need to implement the PartialOrd trait is that the datastore library (a
// key-value storage) which we use allows performing queries where the results can be ordered.
//
// Since the struct that implements PartialOrd is internal and since we never use this ordering
// feature, I think it's ok to have this code.
impl PartialOrd for PeerInfo {
#[inline]
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}
#[cfg(test)]
mod tests {
extern crate serde_json;
use super::*;
use std::time::UNIX_EPOCH;
#[test]
fn ser_and_deser() {
let peer_info = PeerInfo {
addrs: vec![(
"/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap(),
UNIX_EPOCH,
)],
};
let serialized = serde_json::to_string(&peer_info).unwrap();
let deserialized: PeerInfo = serde_json::from_str(&serialized).unwrap();
assert_eq!(peer_info, deserialized);
}
}

View File

@ -1,116 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use multiaddr::Multiaddr;
use std::time::Duration;
use {PeerId, TTL};
/// Implemented on objects that store peers.
///
/// Note that the methods of this trait take by ownership (i.e. `self` instead of `&self` or
/// `&mut self`). This was made so that the associated types could hold `self` internally and
/// because Rust doesn't have higher-ranked trait bounds yet.
///
/// Therefore this trait should likely be implemented on `&'a ConcretePeerstore` instead of
/// on `ConcretePeerstore`.
pub trait Peerstore {
/// Grants access to the a peer inside the peer store.
type PeerAccess: PeerAccess;
/// List of the peers in this peer store.
type PeersIter: Iterator<Item = PeerId>;
/// Grants access to a peer by its ID.
fn peer(self, peer_id: &PeerId) -> Option<Self::PeerAccess>;
/// Grants access to a peer by its ID or creates it.
fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess;
/// Returns a list of peers in this peer store.
///
/// Keep in mind that the trait implementation may allow new peers to be added or removed at
/// any time. If that is the case, you have to take into account that this is only an
/// indication.
fn peers(self) -> Self::PeersIter;
}
/// Implemented on objects that represent an open access to a peer stored in a peer store.
///
/// The exact semantics of "open access" depend on the trait implementation.
pub trait PeerAccess {
/// Iterator returned by `addrs`.
type AddrsIter: Iterator<Item = Multiaddr>;
/// Returns all known and non-expired addresses for a given peer.
///
/// > **Note**: Keep in mind that this function is racy because addresses can expire between
/// > the moment when you get them and the moment when you process them.
fn addrs(&self) -> Self::AddrsIter;
/// Adds an address to a peer.
///
/// If the manager already has this address stored and with a longer TTL, then the operation
/// is a no-op.
fn add_addr(&mut self, addr: Multiaddr, ttl: TTL);
// Similar to calling `add_addr` multiple times in a row.
#[inline]
fn add_addrs<I>(&mut self, addrs: I, ttl: TTL)
where
I: IntoIterator<Item = Multiaddr>,
{
for addr in addrs.into_iter() {
self.add_addr(addr, ttl);
}
}
/// Removes an address from a peer.
#[inline]
fn rm_addr(&mut self, addr: Multiaddr) {
self.set_addr_ttl(addr, Duration::new(0, 0));
}
// Similar to calling `rm_addr` multiple times in a row.
fn rm_addrs<I>(&mut self, addrs: I)
where
I: IntoIterator<Item = Multiaddr>,
{
for addr in addrs.into_iter() {
self.rm_addr(addr);
}
}
/// Sets the TTL of an address of a peer. Adds the address if it is currently unknown.
///
/// Contrary to `add_addr`, this operation is never a no-op.
fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL);
// Similar to calling `set_addr_ttl` multiple times in a row.
fn set_addrs_ttl<I>(&mut self, addrs: I, ttl: TTL)
where
I: IntoIterator<Item = Multiaddr>,
{
for addr in addrs.into_iter() {
self.set_addr_ttl(addr, ttl);
}
}
/// Removes all previously stored addresses.
fn clear_addrs(&mut self);
}

View File

@ -1,132 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains the `peerstore_tests!` macro which will test an implementation of `Peerstore`.
//! You need to pass as first parameter a way to create the `Peerstore` that you want to test.
//!
//! You can also pass as additional parameters a list of statements that will be inserted before
//! each test. This allows you to have the peerstore builder use variables created by these
//! statements.
#![cfg(test)]
macro_rules! peerstore_tests {
({$create_peerstore:expr} $({$stmt:stmt})*) => {
extern crate multihash;
use std::thread;
use std::time::Duration;
use {Peerstore, PeerAccess, PeerId};
use multiaddr::Multiaddr;
#[test]
fn initially_empty() {
$($stmt;)*
let peer_store = $create_peerstore;
let peer_id = PeerId::random();
assert_eq!(peer_store.peers().count(), 0);
assert!(peer_store.peer(&peer_id).is_none());
}
#[test]
fn set_then_get_addr() {
$($stmt;)*
let peer_store = $create_peerstore;
let peer_id = PeerId::random();
let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000));
let addrs = peer_store.peer(&peer_id).unwrap().addrs().collect::<Vec<_>>();
assert_eq!(addrs, &[addr]);
}
#[test]
fn add_expired_addr() {
// Add an already-expired address to a peer.
$($stmt;)*
let peer_store = $create_peerstore;
let peer_id = PeerId::random();
let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(0));
thread::sleep(Duration::from_millis(2));
let addrs = peer_store.peer(&peer_id).unwrap().addrs();
assert_eq!(addrs.count(), 0);
}
#[test]
fn clear_addrs() {
$($stmt;)*
let peer_store = $create_peerstore;
let peer_id = PeerId::random();
let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id)
.add_addr(addr.clone(), Duration::from_millis(5000));
peer_store.peer(&peer_id).unwrap().clear_addrs();
let addrs = peer_store.peer(&peer_id).unwrap().addrs();
assert_eq!(addrs.count(), 0);
}
#[test]
fn no_update_ttl() {
$($stmt;)*
let peer_store = $create_peerstore;
let peer_id = PeerId::random();
let addr1 = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
let addr2 = "/ip4/0.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id)
.add_addr(addr1.clone(), Duration::from_millis(5000));
peer_store.peer_or_create(&peer_id)
.add_addr(addr2.clone(), Duration::from_millis(5000));
assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2);
// `add_addr` must not overwrite the TTL because it's already higher
peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(0));
thread::sleep(Duration::from_millis(2));
assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2);
}
#[test]
fn force_update_ttl() {
$($stmt;)*
let peer_store = $create_peerstore;
let peer_id = PeerId::random();
let addr1 = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
let addr2 = "/ip4/0.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id)
.add_addr(addr1.clone(), Duration::from_millis(5000));
peer_store.peer_or_create(&peer_id)
.add_addr(addr2.clone(), Duration::from_millis(5000));
assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 2);
peer_store.peer_or_create(&peer_id)
.set_addr_ttl(addr1.clone(), Duration::from_millis(0));
thread::sleep(Duration::from_millis(2));
assert_eq!(peer_store.peer(&peer_id).unwrap().addrs().count(), 1);
}
};
}

View File

@ -1,19 +0,0 @@
[package]
name = "libp2p-relay"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
[dependencies]
bytes = "0.4"
futures = "0.1"
libp2p-peerstore = { path = "../../stores/peerstore" }
libp2p-core = { path = "../../core" }
log = "0.4"
multiaddr = { path = "../../misc/multiaddr" }
protobuf = "2.0.2"
rand = "0.6"
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2.1", features = ["codec"] }
void = "1"

View File

@ -1,139 +0,0 @@
// Copyright (c) 2018 Tokio Contributors
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// Based on copy.rs from tokio-io in https://github.com/tokio-rs/tokio
use futures::{Future, Poll, try_ready};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
/// A future which will copy all data from a reader into a writer.
///
/// Created by the [`copy`] function, this future will resolve to the number of
/// bytes copied or an error if one happens.
///
/// [`copy`]: fn.copy.html
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Copy<R, W> {
reader: Option<R>,
read_done: bool,
flush_done: bool,
writer: Option<W>,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
/// Creates a future which represents copying all the bytes from one object to
/// another.
///
/// The returned future will copy all the bytes read from `reader` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned and the `reader` and `writer` are
/// consumed. On error the error is returned and the I/O objects are consumed as
/// well.
pub fn flushing_copy<R, W>(reader: R, writer: W) -> Copy<R, W>
where R: AsyncRead,
W: AsyncWrite,
{
Copy {
reader: Some(reader),
read_done: false,
flush_done: true,
writer: Some(writer),
amt: 0,
pos: 0,
cap: 0,
buf: Box::new([0; 2048]),
}
}
impl<R, W> Future for Copy<R, W>
where R: AsyncRead,
W: AsyncWrite,
{
type Item = (u64, R, W);
type Error = io::Error;
fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
debug_assert!(self.reader.is_some() && self.writer.is_some(),
"poll() has been called again after returning Ok");
loop {
// Still not finished flushing
if !self.flush_done {
try_ready!(self.writer.as_mut().unwrap().poll_flush());
self.flush_done = true
}
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
let reader = self.reader.as_mut().unwrap();
let n = try_ready!(reader.poll_read(&mut self.buf));
if n == 0 {
self.read_done = true;
} else {
self.pos = 0;
self.cap = n;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let writer = self.writer.as_mut().unwrap();
let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
if i == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"write zero byte into writer"));
} else {
self.pos += i;
self.amt += i as u64;
}
}
// The buffered data has been written, let's flush it!
if self.pos == self.cap && !self.read_done {
self.flush_done = false;
continue
}
// Everything has been copied.
if self.pos == self.cap && self.read_done {
try_ready!(self.writer.as_mut().unwrap().poll_flush());
let reader = self.reader.take().unwrap();
let writer = self.writer.take().unwrap();
return Ok((self.amt, reader, writer).into())
}
}
}
}

View File

@ -1,76 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::{PeerId, upgrade::UpgradeError};
use std::{fmt, io};
#[derive(Debug)]
pub enum RelayError<E> {
Io(io::Error),
Upgrade(UpgradeError<E>),
NoRelayFor(PeerId),
Message(&'static str),
#[doc(hidden)]
__Nonexhaustive
}
impl<E> fmt::Display for RelayError<E>
where
E: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
RelayError::Io(e) => write!(f, "i/o error: {}", e),
RelayError::Upgrade(e) => write!(f, "upgrade error: {}", e),
RelayError::NoRelayFor(p) => write!(f, "no relay for peer: {:?}", p),
RelayError::Message(m) => write!(f, "{}", m),
RelayError::__Nonexhaustive => f.write_str("__Nonexhaustive")
}
}
}
impl<E> std::error::Error for RelayError<E>
where
E: std::error::Error
{
fn cause(&self) -> Option<&dyn std::error::Error> {
match self {
RelayError::Io(e) => Some(e),
RelayError::Upgrade(e) => Some(e),
RelayError::NoRelayFor(_) => None,
RelayError::Message(_) => None,
RelayError::__Nonexhaustive => None
}
}
}
impl<E> From<io::Error> for RelayError<E> {
fn from(e: io::Error) -> Self {
RelayError::Io(e)
}
}
impl<E> From<UpgradeError<E>> for RelayError<E> {
fn from(e: UpgradeError<E>) -> Self {
RelayError::Upgrade(e)
}
}

View File

@ -1,42 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate bytes;
extern crate futures;
extern crate libp2p_peerstore as peerstore;
extern crate libp2p_core;
extern crate log;
extern crate multiaddr;
extern crate protobuf;
extern crate rand;
extern crate tokio_codec;
extern crate tokio_io;
extern crate unsigned_varint;
extern crate void;
mod copy;
mod error;
mod message;
mod protocol;
mod transport;
mod utility;
pub use protocol::{Output, RelayConfig};
pub use transport::RelayTransport;

View File

@ -1,42 +0,0 @@
syntax = "proto2";
message CircuitRelay {
enum Status {
SUCCESS = 100;
HOP_SRC_ADDR_TOO_LONG = 220;
HOP_DST_ADDR_TOO_LONG = 221;
HOP_SRC_MULTIADDR_INVALID = 250;
HOP_DST_MULTIADDR_INVALID = 251;
HOP_NO_CONN_TO_DST = 260;
HOP_CANT_DIAL_DST = 261;
HOP_CANT_OPEN_DST_STREAM = 262;
HOP_CANT_SPEAK_RELAY = 270;
HOP_CANT_RELAY_TO_SELF = 280;
STOP_SRC_ADDR_TOO_LONG = 320;
STOP_DST_ADDR_TOO_LONG = 321;
STOP_SRC_MULTIADDR_INVALID = 350;
STOP_DST_MULTIADDR_INVALID = 351;
STOP_RELAY_REFUSED = 390;
MALFORMED_MESSAGE = 400;
}
enum Type { // RPC identifier, either HOP, STOP or STATUS
HOP = 1;
STOP = 2;
STATUS = 3;
CAN_HOP = 4; // is peer a relay?
}
message Peer {
required bytes id = 1; // peer id
repeated bytes addrs = 2; // peer's known addresses
}
optional Type type = 1; // Type of the message
optional Peer srcPeer = 2; // srcPeer and dstPeer are used when Type is HOP or STOP
optional Peer dstPeer = 3;
optional Status code = 4; // Status code, used when Type is STATUS
}

View File

@ -1,803 +0,0 @@
// This file is generated by rust-protobuf 2.0.2. Do not edit
// @generated
// https://github.com/Manishearth/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy)]
#![cfg_attr(rustfmt, rustfmt_skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unsafe_code)]
#![allow(unused_imports)]
#![allow(unused_results)]
use protobuf::Message as Message_imported_for_functions;
use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
#[derive(PartialEq,Clone,Default)]
pub struct CircuitRelay {
// message fields
field_type: ::std::option::Option<CircuitRelay_Type>,
srcPeer: ::protobuf::SingularPtrField<CircuitRelay_Peer>,
dstPeer: ::protobuf::SingularPtrField<CircuitRelay_Peer>,
code: ::std::option::Option<CircuitRelay_Status>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
}
impl CircuitRelay {
pub fn new() -> CircuitRelay {
::std::default::Default::default()
}
// optional .CircuitRelay.Type type = 1;
pub fn clear_field_type(&mut self) {
self.field_type = ::std::option::Option::None;
}
pub fn has_field_type(&self) -> bool {
self.field_type.is_some()
}
// Param is passed by value, moved
pub fn set_field_type(&mut self, v: CircuitRelay_Type) {
self.field_type = ::std::option::Option::Some(v);
}
pub fn get_field_type(&self) -> CircuitRelay_Type {
self.field_type.unwrap_or(CircuitRelay_Type::HOP)
}
// optional .CircuitRelay.Peer srcPeer = 2;
pub fn clear_srcPeer(&mut self) {
self.srcPeer.clear();
}
pub fn has_srcPeer(&self) -> bool {
self.srcPeer.is_some()
}
// Param is passed by value, moved
pub fn set_srcPeer(&mut self, v: CircuitRelay_Peer) {
self.srcPeer = ::protobuf::SingularPtrField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_srcPeer(&mut self) -> &mut CircuitRelay_Peer {
if self.srcPeer.is_none() {
self.srcPeer.set_default();
}
self.srcPeer.as_mut().unwrap()
}
// Take field
pub fn take_srcPeer(&mut self) -> CircuitRelay_Peer {
self.srcPeer.take().unwrap_or_else(|| CircuitRelay_Peer::new())
}
pub fn get_srcPeer(&self) -> &CircuitRelay_Peer {
self.srcPeer.as_ref().unwrap_or_else(|| CircuitRelay_Peer::default_instance())
}
// optional .CircuitRelay.Peer dstPeer = 3;
pub fn clear_dstPeer(&mut self) {
self.dstPeer.clear();
}
pub fn has_dstPeer(&self) -> bool {
self.dstPeer.is_some()
}
// Param is passed by value, moved
pub fn set_dstPeer(&mut self, v: CircuitRelay_Peer) {
self.dstPeer = ::protobuf::SingularPtrField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_dstPeer(&mut self) -> &mut CircuitRelay_Peer {
if self.dstPeer.is_none() {
self.dstPeer.set_default();
}
self.dstPeer.as_mut().unwrap()
}
// Take field
pub fn take_dstPeer(&mut self) -> CircuitRelay_Peer {
self.dstPeer.take().unwrap_or_else(|| CircuitRelay_Peer::new())
}
pub fn get_dstPeer(&self) -> &CircuitRelay_Peer {
self.dstPeer.as_ref().unwrap_or_else(|| CircuitRelay_Peer::default_instance())
}
// optional .CircuitRelay.Status code = 4;
pub fn clear_code(&mut self) {
self.code = ::std::option::Option::None;
}
pub fn has_code(&self) -> bool {
self.code.is_some()
}
// Param is passed by value, moved
pub fn set_code(&mut self, v: CircuitRelay_Status) {
self.code = ::std::option::Option::Some(v);
}
pub fn get_code(&self) -> CircuitRelay_Status {
self.code.unwrap_or(CircuitRelay_Status::SUCCESS)
}
}
impl ::protobuf::Message for CircuitRelay {
fn is_initialized(&self) -> bool {
for v in &self.srcPeer {
if !v.is_initialized() {
return false;
}
};
for v in &self.dstPeer {
if !v.is_initialized() {
return false;
}
};
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_proto2_enum_with_unknown_fields_into(wire_type, is, &mut self.field_type, 1, &mut self.unknown_fields)?
},
2 => {
::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.srcPeer)?;
},
3 => {
::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.dstPeer)?;
},
4 => {
::protobuf::rt::read_proto2_enum_with_unknown_fields_into(wire_type, is, &mut self.code, 4, &mut self.unknown_fields)?
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if let Some(v) = self.field_type {
my_size += ::protobuf::rt::enum_size(1, v);
}
if let Some(ref v) = self.srcPeer.as_ref() {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
}
if let Some(ref v) = self.dstPeer.as_ref() {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
}
if let Some(v) = self.code {
my_size += ::protobuf::rt::enum_size(4, v);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> {
if let Some(v) = self.field_type {
os.write_enum(1, v.value())?;
}
if let Some(ref v) = self.srcPeer.as_ref() {
os.write_tag(2, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
}
if let Some(ref v) = self.dstPeer.as_ref() {
os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
}
if let Some(v) = self.code {
os.write_enum(4, v.value())?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &::std::any::Any {
self as &::std::any::Any
}
fn as_any_mut(&mut self) -> &mut ::std::any::Any {
self as &mut ::std::any::Any
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<::std::any::Any> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> CircuitRelay {
CircuitRelay::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum<CircuitRelay_Type>>(
"type",
|m: &CircuitRelay| { &m.field_type },
|m: &mut CircuitRelay| { &mut m.field_type },
));
fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<CircuitRelay_Peer>>(
"srcPeer",
|m: &CircuitRelay| { &m.srcPeer },
|m: &mut CircuitRelay| { &mut m.srcPeer },
));
fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<CircuitRelay_Peer>>(
"dstPeer",
|m: &CircuitRelay| { &m.dstPeer },
|m: &mut CircuitRelay| { &mut m.dstPeer },
));
fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum<CircuitRelay_Status>>(
"code",
|m: &CircuitRelay| { &m.code },
|m: &mut CircuitRelay| { &mut m.code },
));
::protobuf::reflect::MessageDescriptor::new::<CircuitRelay>(
"CircuitRelay",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static CircuitRelay {
static mut instance: ::protobuf::lazy::Lazy<CircuitRelay> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const CircuitRelay,
};
unsafe {
instance.get(CircuitRelay::new)
}
}
}
impl ::protobuf::Clear for CircuitRelay {
fn clear(&mut self) {
self.clear_field_type();
self.clear_srcPeer();
self.clear_dstPeer();
self.clear_code();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for CircuitRelay {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for CircuitRelay {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default)]
pub struct CircuitRelay_Peer {
// message fields
id: ::protobuf::SingularField<::std::vec::Vec<u8>>,
addrs: ::protobuf::RepeatedField<::std::vec::Vec<u8>>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
}
impl CircuitRelay_Peer {
pub fn new() -> CircuitRelay_Peer {
::std::default::Default::default()
}
// required bytes id = 1;
pub fn clear_id(&mut self) {
self.id.clear();
}
pub fn has_id(&self) -> bool {
self.id.is_some()
}
// Param is passed by value, moved
pub fn set_id(&mut self, v: ::std::vec::Vec<u8>) {
self.id = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_id(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.id.is_none() {
self.id.set_default();
}
self.id.as_mut().unwrap()
}
// Take field
pub fn take_id(&mut self) -> ::std::vec::Vec<u8> {
self.id.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_id(&self) -> &[u8] {
match self.id.as_ref() {
Some(v) => &v,
None => &[],
}
}
// repeated bytes addrs = 2;
pub fn clear_addrs(&mut self) {
self.addrs.clear();
}
// Param is passed by value, moved
pub fn set_addrs(&mut self, v: ::protobuf::RepeatedField<::std::vec::Vec<u8>>) {
self.addrs = v;
}
// Mutable pointer to the field.
pub fn mut_addrs(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
&mut self.addrs
}
// Take field
pub fn take_addrs(&mut self) -> ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
::std::mem::replace(&mut self.addrs, ::protobuf::RepeatedField::new())
}
pub fn get_addrs(&self) -> &[::std::vec::Vec<u8>] {
&self.addrs
}
}
impl ::protobuf::Message for CircuitRelay_Peer {
fn is_initialized(&self) -> bool {
if self.id.is_none() {
return false;
}
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.id)?;
},
2 => {
::protobuf::rt::read_repeated_bytes_into(wire_type, is, &mut self.addrs)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if let Some(ref v) = self.id.as_ref() {
my_size += ::protobuf::rt::bytes_size(1, &v);
}
for value in &self.addrs {
my_size += ::protobuf::rt::bytes_size(2, &value);
};
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> {
if let Some(ref v) = self.id.as_ref() {
os.write_bytes(1, &v)?;
}
for v in &self.addrs {
os.write_bytes(2, &v)?;
};
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &::std::any::Any {
self as &::std::any::Any
}
fn as_any_mut(&mut self) -> &mut ::std::any::Any {
self as &mut ::std::any::Any
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<::std::any::Any> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> CircuitRelay_Peer {
CircuitRelay_Peer::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"id",
|m: &CircuitRelay_Peer| { &m.id },
|m: &mut CircuitRelay_Peer| { &mut m.id },
));
fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"addrs",
|m: &CircuitRelay_Peer| { &m.addrs },
|m: &mut CircuitRelay_Peer| { &mut m.addrs },
));
::protobuf::reflect::MessageDescriptor::new::<CircuitRelay_Peer>(
"CircuitRelay_Peer",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static CircuitRelay_Peer {
static mut instance: ::protobuf::lazy::Lazy<CircuitRelay_Peer> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const CircuitRelay_Peer,
};
unsafe {
instance.get(CircuitRelay_Peer::new)
}
}
}
impl ::protobuf::Clear for CircuitRelay_Peer {
fn clear(&mut self) {
self.clear_id();
self.clear_addrs();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for CircuitRelay_Peer {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for CircuitRelay_Peer {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum CircuitRelay_Status {
SUCCESS = 100,
HOP_SRC_ADDR_TOO_LONG = 220,
HOP_DST_ADDR_TOO_LONG = 221,
HOP_SRC_MULTIADDR_INVALID = 250,
HOP_DST_MULTIADDR_INVALID = 251,
HOP_NO_CONN_TO_DST = 260,
HOP_CANT_DIAL_DST = 261,
HOP_CANT_OPEN_DST_STREAM = 262,
HOP_CANT_SPEAK_RELAY = 270,
HOP_CANT_RELAY_TO_SELF = 280,
STOP_SRC_ADDR_TOO_LONG = 320,
STOP_DST_ADDR_TOO_LONG = 321,
STOP_SRC_MULTIADDR_INVALID = 350,
STOP_DST_MULTIADDR_INVALID = 351,
STOP_RELAY_REFUSED = 390,
MALFORMED_MESSAGE = 400,
}
impl ::protobuf::ProtobufEnum for CircuitRelay_Status {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<CircuitRelay_Status> {
match value {
100 => ::std::option::Option::Some(CircuitRelay_Status::SUCCESS),
220 => ::std::option::Option::Some(CircuitRelay_Status::HOP_SRC_ADDR_TOO_LONG),
221 => ::std::option::Option::Some(CircuitRelay_Status::HOP_DST_ADDR_TOO_LONG),
250 => ::std::option::Option::Some(CircuitRelay_Status::HOP_SRC_MULTIADDR_INVALID),
251 => ::std::option::Option::Some(CircuitRelay_Status::HOP_DST_MULTIADDR_INVALID),
260 => ::std::option::Option::Some(CircuitRelay_Status::HOP_NO_CONN_TO_DST),
261 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_DIAL_DST),
262 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_OPEN_DST_STREAM),
270 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_SPEAK_RELAY),
280 => ::std::option::Option::Some(CircuitRelay_Status::HOP_CANT_RELAY_TO_SELF),
320 => ::std::option::Option::Some(CircuitRelay_Status::STOP_SRC_ADDR_TOO_LONG),
321 => ::std::option::Option::Some(CircuitRelay_Status::STOP_DST_ADDR_TOO_LONG),
350 => ::std::option::Option::Some(CircuitRelay_Status::STOP_SRC_MULTIADDR_INVALID),
351 => ::std::option::Option::Some(CircuitRelay_Status::STOP_DST_MULTIADDR_INVALID),
390 => ::std::option::Option::Some(CircuitRelay_Status::STOP_RELAY_REFUSED),
400 => ::std::option::Option::Some(CircuitRelay_Status::MALFORMED_MESSAGE),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [CircuitRelay_Status] = &[
CircuitRelay_Status::SUCCESS,
CircuitRelay_Status::HOP_SRC_ADDR_TOO_LONG,
CircuitRelay_Status::HOP_DST_ADDR_TOO_LONG,
CircuitRelay_Status::HOP_SRC_MULTIADDR_INVALID,
CircuitRelay_Status::HOP_DST_MULTIADDR_INVALID,
CircuitRelay_Status::HOP_NO_CONN_TO_DST,
CircuitRelay_Status::HOP_CANT_DIAL_DST,
CircuitRelay_Status::HOP_CANT_OPEN_DST_STREAM,
CircuitRelay_Status::HOP_CANT_SPEAK_RELAY,
CircuitRelay_Status::HOP_CANT_RELAY_TO_SELF,
CircuitRelay_Status::STOP_SRC_ADDR_TOO_LONG,
CircuitRelay_Status::STOP_DST_ADDR_TOO_LONG,
CircuitRelay_Status::STOP_SRC_MULTIADDR_INVALID,
CircuitRelay_Status::STOP_DST_MULTIADDR_INVALID,
CircuitRelay_Status::STOP_RELAY_REFUSED,
CircuitRelay_Status::MALFORMED_MESSAGE,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::EnumDescriptor,
};
unsafe {
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new("CircuitRelay_Status", file_descriptor_proto())
})
}
}
}
impl ::std::marker::Copy for CircuitRelay_Status {
}
impl ::protobuf::reflect::ProtobufValue for CircuitRelay_Status {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor())
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum CircuitRelay_Type {
HOP = 1,
STOP = 2,
STATUS = 3,
CAN_HOP = 4,
}
impl ::protobuf::ProtobufEnum for CircuitRelay_Type {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<CircuitRelay_Type> {
match value {
1 => ::std::option::Option::Some(CircuitRelay_Type::HOP),
2 => ::std::option::Option::Some(CircuitRelay_Type::STOP),
3 => ::std::option::Option::Some(CircuitRelay_Type::STATUS),
4 => ::std::option::Option::Some(CircuitRelay_Type::CAN_HOP),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [CircuitRelay_Type] = &[
CircuitRelay_Type::HOP,
CircuitRelay_Type::STOP,
CircuitRelay_Type::STATUS,
CircuitRelay_Type::CAN_HOP,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::EnumDescriptor,
};
unsafe {
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new("CircuitRelay_Type", file_descriptor_proto())
})
}
}
}
impl ::std::marker::Copy for CircuitRelay_Type {
}
impl ::protobuf::reflect::ProtobufValue for CircuitRelay_Type {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor())
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\rmessage.proto\"\xe3\x05\n\x0cCircuitRelay\x12&\n\x04type\x18\x01\x20\
\x01(\x0e2\x12.CircuitRelay.TypeR\x04type\x12,\n\x07srcPeer\x18\x02\x20\
\x01(\x0b2\x12.CircuitRelay.PeerR\x07srcPeer\x12,\n\x07dstPeer\x18\x03\
\x20\x01(\x0b2\x12.CircuitRelay.PeerR\x07dstPeer\x12(\n\x04code\x18\x04\
\x20\x01(\x0e2\x14.CircuitRelay.StatusR\x04code\x1a,\n\x04Peer\x12\x0e\n\
\x02id\x18\x01\x20\x02(\x0cR\x02id\x12\x14\n\x05addrs\x18\x02\x20\x03(\
\x0cR\x05addrs\"\xc2\x03\n\x06Status\x12\x0b\n\x07SUCCESS\x10d\x12\x1a\n\
\x15HOP_SRC_ADDR_TOO_LONG\x10\xdc\x01\x12\x1a\n\x15HOP_DST_ADDR_TOO_LONG\
\x10\xdd\x01\x12\x1e\n\x19HOP_SRC_MULTIADDR_INVALID\x10\xfa\x01\x12\x1e\
\n\x19HOP_DST_MULTIADDR_INVALID\x10\xfb\x01\x12\x17\n\x12HOP_NO_CONN_TO_\
DST\x10\x84\x02\x12\x16\n\x11HOP_CANT_DIAL_DST\x10\x85\x02\x12\x1d\n\x18\
HOP_CANT_OPEN_DST_STREAM\x10\x86\x02\x12\x19\n\x14HOP_CANT_SPEAK_RELAY\
\x10\x8e\x02\x12\x1b\n\x16HOP_CANT_RELAY_TO_SELF\x10\x98\x02\x12\x1b\n\
\x16STOP_SRC_ADDR_TOO_LONG\x10\xc0\x02\x12\x1b\n\x16STOP_DST_ADDR_TOO_LO\
NG\x10\xc1\x02\x12\x1f\n\x1aSTOP_SRC_MULTIADDR_INVALID\x10\xde\x02\x12\
\x1f\n\x1aSTOP_DST_MULTIADDR_INVALID\x10\xdf\x02\x12\x17\n\x12STOP_RELAY\
_REFUSED\x10\x86\x03\x12\x16\n\x11MALFORMED_MESSAGE\x10\x90\x03\"2\n\x04\
Type\x12\x07\n\x03HOP\x10\x01\x12\x08\n\x04STOP\x10\x02\x12\n\n\x06STATU\
S\x10\x03\x12\x0b\n\x07CAN_HOP\x10\x04J\xbc\r\n\x06\x12\x04\0\0)\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0)\x01\n\n\n\
\x03\x04\0\x01\x12\x03\x02\x08\x14\n\x0c\n\x04\x04\0\x04\0\x12\x04\x04\
\x04\x15\x05\n\x0c\n\x05\x04\0\x04\0\x01\x12\x03\x04\t\x0f\n\r\n\x06\x04\
\0\x04\0\x02\0\x12\x03\x05\x08)\n\x0e\n\x07\x04\0\x04\0\x02\0\x01\x12\
\x03\x05\x08\x0f\n\x0e\n\x07\x04\0\x04\0\x02\0\x02\x12\x03\x05%(\n\r\n\
\x06\x04\0\x04\0\x02\x01\x12\x03\x06\x08)\n\x0e\n\x07\x04\0\x04\0\x02\
\x01\x01\x12\x03\x06\x08\x1d\n\x0e\n\x07\x04\0\x04\0\x02\x01\x02\x12\x03\
\x06%(\n\r\n\x06\x04\0\x04\0\x02\x02\x12\x03\x07\x08)\n\x0e\n\x07\x04\0\
\x04\0\x02\x02\x01\x12\x03\x07\x08\x1d\n\x0e\n\x07\x04\0\x04\0\x02\x02\
\x02\x12\x03\x07%(\n\r\n\x06\x04\0\x04\0\x02\x03\x12\x03\x08\x08)\n\x0e\
\n\x07\x04\0\x04\0\x02\x03\x01\x12\x03\x08\x08!\n\x0e\n\x07\x04\0\x04\0\
\x02\x03\x02\x12\x03\x08%(\n\r\n\x06\x04\0\x04\0\x02\x04\x12\x03\t\x08)\
\n\x0e\n\x07\x04\0\x04\0\x02\x04\x01\x12\x03\t\x08!\n\x0e\n\x07\x04\0\
\x04\0\x02\x04\x02\x12\x03\t%(\n\r\n\x06\x04\0\x04\0\x02\x05\x12\x03\n\
\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x05\x01\x12\x03\n\x08\x1a\n\x0e\n\x07\
\x04\0\x04\0\x02\x05\x02\x12\x03\n%(\n\r\n\x06\x04\0\x04\0\x02\x06\x12\
\x03\x0b\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x06\x01\x12\x03\x0b\x08\x19\n\
\x0e\n\x07\x04\0\x04\0\x02\x06\x02\x12\x03\x0b%(\n\r\n\x06\x04\0\x04\0\
\x02\x07\x12\x03\x0c\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x07\x01\x12\x03\
\x0c\x08\x20\n\x0e\n\x07\x04\0\x04\0\x02\x07\x02\x12\x03\x0c%(\n\r\n\x06\
\x04\0\x04\0\x02\x08\x12\x03\r\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x08\x01\
\x12\x03\r\x08\x1c\n\x0e\n\x07\x04\0\x04\0\x02\x08\x02\x12\x03\r%(\n\r\n\
\x06\x04\0\x04\0\x02\t\x12\x03\x0e\x08)\n\x0e\n\x07\x04\0\x04\0\x02\t\
\x01\x12\x03\x0e\x08\x1e\n\x0e\n\x07\x04\0\x04\0\x02\t\x02\x12\x03\x0e%(\
\n\r\n\x06\x04\0\x04\0\x02\n\x12\x03\x0f\x08)\n\x0e\n\x07\x04\0\x04\0\
\x02\n\x01\x12\x03\x0f\x08\x1e\n\x0e\n\x07\x04\0\x04\0\x02\n\x02\x12\x03\
\x0f%(\n\r\n\x06\x04\0\x04\0\x02\x0b\x12\x03\x10\x08)\n\x0e\n\x07\x04\0\
\x04\0\x02\x0b\x01\x12\x03\x10\x08\x1e\n\x0e\n\x07\x04\0\x04\0\x02\x0b\
\x02\x12\x03\x10%(\n\r\n\x06\x04\0\x04\0\x02\x0c\x12\x03\x11\x08)\n\x0e\
\n\x07\x04\0\x04\0\x02\x0c\x01\x12\x03\x11\x08\"\n\x0e\n\x07\x04\0\x04\0\
\x02\x0c\x02\x12\x03\x11%(\n\r\n\x06\x04\0\x04\0\x02\r\x12\x03\x12\x08)\
\n\x0e\n\x07\x04\0\x04\0\x02\r\x01\x12\x03\x12\x08\"\n\x0e\n\x07\x04\0\
\x04\0\x02\r\x02\x12\x03\x12%(\n\r\n\x06\x04\0\x04\0\x02\x0e\x12\x03\x13\
\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x0e\x01\x12\x03\x13\x08\x1a\n\x0e\n\
\x07\x04\0\x04\0\x02\x0e\x02\x12\x03\x13%(\n\r\n\x06\x04\0\x04\0\x02\x0f\
\x12\x03\x14\x08)\n\x0e\n\x07\x04\0\x04\0\x02\x0f\x01\x12\x03\x14\x08\
\x19\n\x0e\n\x07\x04\0\x04\0\x02\x0f\x02\x12\x03\x14%(\n:\n\x04\x04\0\
\x04\x01\x12\x04\x17\x04\x1c\x05\",\x20RPC\x20identifier,\x20either\x20H\
OP,\x20STOP\x20or\x20STATUS\n\n\x0c\n\x05\x04\0\x04\x01\x01\x12\x03\x17\
\t\r\n\r\n\x06\x04\0\x04\x01\x02\0\x12\x03\x18\x08\x10\n\x0e\n\x07\x04\0\
\x04\x01\x02\0\x01\x12\x03\x18\x08\x0b\n\x0e\n\x07\x04\0\x04\x01\x02\0\
\x02\x12\x03\x18\x0e\x0f\n\r\n\x06\x04\0\x04\x01\x02\x01\x12\x03\x19\x08\
\x11\n\x0e\n\x07\x04\0\x04\x01\x02\x01\x01\x12\x03\x19\x08\x0c\n\x0e\n\
\x07\x04\0\x04\x01\x02\x01\x02\x12\x03\x19\x0f\x10\n\r\n\x06\x04\0\x04\
\x01\x02\x02\x12\x03\x1a\x08\x13\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x01\
\x12\x03\x1a\x08\x0e\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x02\x12\x03\x1a\
\x11\x12\n!\n\x06\x04\0\x04\x01\x02\x03\x12\x03\x1b\x08\x14\"\x12\x20is\
\x20peer\x20a\x20relay?\n\n\x0e\n\x07\x04\0\x04\x01\x02\x03\x01\x12\x03\
\x1b\x08\x0f\n\x0e\n\x07\x04\0\x04\x01\x02\x03\x02\x12\x03\x1b\x12\x13\n\
\x0c\n\x04\x04\0\x03\0\x12\x04\x1e\x04!\x05\n\x0c\n\x05\x04\0\x03\0\x01\
\x12\x03\x1e\x0c\x10\n\x18\n\x06\x04\0\x03\0\x02\0\x12\x03\x1f\x08\x1e\"\
\t\x20peer\x20id\n\n\x0e\n\x07\x04\0\x03\0\x02\0\x04\x12\x03\x1f\x08\x10\
\n\x0e\n\x07\x04\0\x03\0\x02\0\x05\x12\x03\x1f\x11\x16\n\x0e\n\x07\x04\0\
\x03\0\x02\0\x01\x12\x03\x1f\x17\x19\n\x0e\n\x07\x04\0\x03\0\x02\0\x03\
\x12\x03\x1f\x1c\x1d\n'\n\x06\x04\0\x03\0\x02\x01\x12\x03\x20\x08!\"\x18\
\x20peer's\x20known\x20addresses\n\n\x0e\n\x07\x04\0\x03\0\x02\x01\x04\
\x12\x03\x20\x08\x10\n\x0e\n\x07\x04\0\x03\0\x02\x01\x05\x12\x03\x20\x11\
\x16\n\x0e\n\x07\x04\0\x03\0\x02\x01\x01\x12\x03\x20\x17\x1c\n\x0e\n\x07\
\x04\0\x03\0\x02\x01\x03\x12\x03\x20\x1f\x20\n\"\n\x04\x04\0\x02\0\x12\
\x03#\x04\x1b\"\x15\x20Type\x20of\x20the\x20message\n\n\x0c\n\x05\x04\0\
\x02\0\x04\x12\x03#\x04\x0c\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03#\r\x11\n\
\x0c\n\x05\x04\0\x02\0\x01\x12\x03#\x12\x16\n\x0c\n\x05\x04\0\x02\0\x03\
\x12\x03#\x19\x1a\nD\n\x04\x04\0\x02\x01\x12\x03%\x04\x1e\"7\x20srcPeer\
\x20and\x20dstPeer\x20are\x20used\x20when\x20Type\x20is\x20HOP\x20or\x20\
STOP\n\n\x0c\n\x05\x04\0\x02\x01\x04\x12\x03%\x04\x0c\n\x0c\n\x05\x04\0\
\x02\x01\x06\x12\x03%\r\x11\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03%\x12\
\x19\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03%\x1c\x1d\n\x0b\n\x04\x04\0\
\x02\x02\x12\x03&\x04\x1e\n\x0c\n\x05\x04\0\x02\x02\x04\x12\x03&\x04\x0c\
\n\x0c\n\x05\x04\0\x02\x02\x06\x12\x03&\r\x11\n\x0c\n\x05\x04\0\x02\x02\
\x01\x12\x03&\x12\x19\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03&\x1c\x1d\n4\
\n\x04\x04\0\x02\x03\x12\x03(\x04\x1d\"'\x20Status\x20code,\x20used\x20w\
hen\x20Type\x20is\x20STATUS\n\n\x0c\n\x05\x04\0\x02\x03\x04\x12\x03(\x04\
\x0c\n\x0c\n\x05\x04\0\x02\x03\x06\x12\x03(\r\x13\n\x0c\n\x05\x04\0\x02\
\x03\x01\x12\x03(\x14\x18\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03(\x1b\x1c\
";
static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto,
};
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
unsafe {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}
}

View File

@ -1,322 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use crate::{
copy,
error::RelayError,
message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type},
utility::{io_err, is_success, status, Io, Peer}
};
use futures::{stream, future::{self, Either::{A, B}, FutureResult}, prelude::*};
use libp2p_core::{
transport::Transport,
upgrade::{apply_outbound, InboundUpgrade, OutboundUpgrade, UpgradeInfo}
};
use log::debug;
use peerstore::{PeerAccess, PeerId, Peerstore};
use std::{io, iter, ops::Deref};
use tokio_io::{AsyncRead, AsyncWrite};
use void::Void;
#[derive(Debug, Clone)]
pub struct RelayConfig<T, P> {
my_id: PeerId,
dialer: T,
peers: P,
// If `allow_relays` is false this node can only be used as a
// destination but will not allow relaying streams to other
// destinations.
allow_relays: bool
}
// The `RelayConfig` upgrade can serve as destination or relay. Each mode needs a different
// output type. As destination we want the stream to continue to be usable, whereas as relay
// we pipe data from source to destination and do not want to use the stream in any other way.
// Therefore, in the latter case we simply return a future that can be driven to completion
// but otherwise the stream is not programmatically accessible.
pub enum Output<C> {
Stream(C),
Sealed(Box<Future<Item=(), Error=io::Error> + Send>)
}
impl<T, P> UpgradeInfo for RelayConfig<T, P> {
type UpgradeId = ();
type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>;
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/libp2p/relay/circuit/0.1.0"), ()))
}
}
impl<C, T, P, S> InboundUpgrade<C> for RelayConfig<T, P>
where
C: AsyncRead + AsyncWrite + Send + 'static,
T: Transport + Clone + Send + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Output: AsyncRead + AsyncWrite + Send,
P: Deref<Target=S> + Clone + Send + 'static,
S: 'static,
for<'a> &'a S: Peerstore
{
type Output = Output<C>;
type Error = RelayError<Void>;
type Future = Box<Future<Item=Self::Output, Error=Self::Error> + Send>;
fn upgrade_inbound(self, conn: C, _: ()) -> Self::Future {
let future = Io::new(conn).recv().from_err().and_then(move |(message, io)| {
let msg = if let Some(m) = message {
m
} else {
return A(A(future::err(RelayError::Message("no message received"))))
};
match msg.get_field_type() {
CircuitRelay_Type::HOP if self.allow_relays => { // act as relay
B(A(self.on_hop(msg, io).map(|fut| Output::Sealed(Box::new(fut)))))
}
CircuitRelay_Type::STOP => { // act as destination
B(B(self.on_stop(msg, io).from_err().map(Output::Stream)))
}
other => {
debug!("invalid message type: {:?}", other);
let resp = status(CircuitRelay_Status::MALFORMED_MESSAGE);
A(B(io.send(resp).from_err().and_then(|_| Err(RelayError::Message("invalid message type")))))
}
}
});
Box::new(future)
}
}
impl<T, P, S> RelayConfig<T, P>
where
T: Transport + Clone + 'static,
T::Dial: Send, // TODO: remove
T::Listener: Send, // TODO: remove
T::ListenerUpgrade: Send, // TODO: remove
T::Output: Send + AsyncRead + AsyncWrite,
P: Deref<Target = S> + Clone + 'static,
for<'a> &'a S: Peerstore,
{
pub fn new(my_id: PeerId, dialer: T, peers: P) -> RelayConfig<T, P> {
RelayConfig { my_id, dialer, peers, allow_relays: true }
}
pub fn allow_relays(&mut self, val: bool) {
self.allow_relays = val
}
// HOP message handling (relay mode).
fn on_hop<C>(self, mut msg: CircuitRelay, io: Io<C>) -> impl Future<Item=impl Future<Item=(), Error=io::Error>, Error=RelayError<Void>>
where
C: AsyncRead + AsyncWrite + 'static,
{
let from = if let Some(peer) = Peer::from_message(msg.take_srcPeer()) {
peer
} else {
let msg = status(CircuitRelay_Status::HOP_SRC_MULTIADDR_INVALID);
return A(io.send(msg).from_err().and_then(|_| Err(RelayError::Message("invalid src address"))))
};
let mut dest = if let Some(peer) = Peer::from_message(msg.take_dstPeer()) {
peer
} else {
let msg = status(CircuitRelay_Status::HOP_DST_MULTIADDR_INVALID);
return B(A(io.send(msg).from_err().and_then(|_| Err(RelayError::Message("invalid dest address")))))
};
if dest.addrs.is_empty() {
// Add locally know addresses of destination
if let Some(peer) = self.peers.peer(&dest.id) {
dest.addrs.extend(peer.addrs())
}
}
let stop = stop_message(&from, &dest);
let dialer = self.dialer;
let future = stream::iter_ok(dest.addrs.into_iter())
.and_then(move |dest_addr| {
dialer.clone().dial(dest_addr).map_err(|_| RelayError::Message("could no dial addr"))
})
.and_then(|outbound| outbound.from_err().and_then(|c| apply_outbound(c, TrivialUpgrade).from_err()))
.then(|result| Ok(result.ok()))
.filter_map(|result| result)
.into_future()
.map_err(|(err, _stream)| err)
.and_then(move |(ok, _stream)| {
if let Some(c) = ok {
// send STOP message to destination and expect back a SUCCESS message
let future = Io::new(c)
.send(stop)
.and_then(Io::recv)
.from_err()
.and_then(|(response, io)| {
let rsp = match response {
Some(m) => m,
None => return Err(RelayError::Message("no message from destination"))
};
if is_success(&rsp) {
Ok(io.into())
} else {
Err(RelayError::Message("no success response from relay"))
}
});
A(future)
} else {
B(future::err(RelayError::Message("could not dial peer")))
}
})
// signal success or failure to source
.then(move |result| {
match result {
Ok(c) => {
let msg = status(CircuitRelay_Status::SUCCESS);
A(io.send(msg).map(|io| (io.into(), c)).from_err())
}
Err(e) => {
let msg = status(CircuitRelay_Status::HOP_CANT_DIAL_DST);
B(io.send(msg).from_err().and_then(|_| Err(e)))
}
}
})
// return future for bidirectional data transfer
.and_then(move |(src, dst)| {
let future = {
let (src_r, src_w) = src.split();
let (dst_r, dst_w) = dst.split();
let a = copy::flushing_copy(src_r, dst_w).map(|_| ());
let b = copy::flushing_copy(dst_r, src_w).map(|_| ());
a.select(b).map(|_| ()).map_err(|(e, _)| e)
};
Ok(future)
});
B(B(future))
}
// STOP message handling (destination mode)
fn on_stop<C>(self, mut msg: CircuitRelay, io: Io<C>) -> impl Future<Item=C, Error=io::Error>
where
C: AsyncRead + AsyncWrite + 'static,
{
let dest = if let Some(peer) = Peer::from_message(msg.take_dstPeer()) {
peer
} else {
let msg = status(CircuitRelay_Status::STOP_DST_MULTIADDR_INVALID);
return A(io.send(msg).and_then(|_| Err(io_err("invalid dest address"))))
};
if dest.id != self.my_id {
let msg = status(CircuitRelay_Status::STOP_RELAY_REFUSED);
return B(A(io.send(msg).and_then(|_| Err(io_err("destination id mismatch")))))
}
B(B(io.send(status(CircuitRelay_Status::SUCCESS)).map(Io::into)))
}
}
fn stop_message(from: &Peer, dest: &Peer) -> CircuitRelay {
let mut msg = CircuitRelay::new();
msg.set_field_type(CircuitRelay_Type::STOP);
let mut f = CircuitRelay_Peer::new();
f.set_id(from.id.as_bytes().to_vec());
for a in &from.addrs {
f.mut_addrs().push(a.to_bytes())
}
msg.set_srcPeer(f);
let mut d = CircuitRelay_Peer::new();
d.set_id(dest.id.as_bytes().to_vec());
for a in &dest.addrs {
d.mut_addrs().push(a.to_bytes())
}
msg.set_dstPeer(d);
msg
}
#[derive(Debug, Clone)]
struct TrivialUpgrade;
impl UpgradeInfo for TrivialUpgrade {
type UpgradeId = ();
type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>;
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/libp2p/relay/circuit/0.1.0"), ()))
}
}
impl<C> OutboundUpgrade<C> for TrivialUpgrade
where
C: AsyncRead + AsyncWrite + 'static
{
type Output = C;
type Error = Void;
type Future = FutureResult<Self::Output, Self::Error>;
fn upgrade_outbound(self, conn: C, _: ()) -> Self::Future {
future::ok(conn)
}
}
#[derive(Debug, Clone)]
pub(crate) struct Source(pub(crate) CircuitRelay);
impl UpgradeInfo for Source {
type UpgradeId = ();
type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>;
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/libp2p/relay/circuit/0.1.0"), ()))
}
}
impl<C> OutboundUpgrade<C> for Source
where
C: AsyncRead + AsyncWrite + Send + 'static,
{
type Output = C;
type Error = io::Error;
type Future = Box<Future<Item=Self::Output, Error=Self::Error> + Send>;
fn upgrade_outbound(self, conn: C, _: ()) -> Self::Future {
let future = Io::new(conn)
.send(self.0)
.and_then(Io::recv)
.and_then(|(response, io)| {
let rsp = match response {
Some(m) => m,
None => return Err(io_err("no message from relay")),
};
if is_success(&rsp) {
Ok(io.into())
} else {
Err(io_err("no success response from relay"))
}
});
Box::new(future)
}
}

View File

@ -1,222 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
error::RelayError,
message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Type},
protocol,
utility::{Peer, RelayAddr}
};
use futures::{future, stream, prelude::*};
use libp2p_core::{transport::Transport, upgrade::apply_outbound};
use log::{debug, info, trace};
use multiaddr::Multiaddr;
use peerstore::{PeerAccess, PeerId, Peerstore};
use rand::prelude::*;
use std::{io, iter::FromIterator, ops::Deref, sync::Arc};
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug, Clone)]
pub struct RelayTransport<T, P> {
my_id: PeerId,
transport: T,
peers: P,
relays: Arc<Vec<PeerId>>
}
impl<T, P, S> Transport for RelayTransport<T, P>
where
T: Transport + Send + Clone + 'static,
T::Dial: Send,
T::Output: AsyncRead + AsyncWrite + Send,
P: Deref<Target=S> + Clone + 'static,
S: 'static,
for<'a> &'a S: Peerstore
{
type Output = T::Output;
type Listener = stream::Empty<(Self::ListenerUpgrade, Multiaddr), io::Error>;
type ListenerUpgrade = future::Empty<Self::Output, io::Error>;
type Dial = Box<Future<Item=Self::Output, Error=io::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((self, addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match RelayAddr::parse(&addr) {
RelayAddr::Malformed => {
debug!("malformed address: {}", addr);
return Err((self, addr));
}
RelayAddr::Multihop => {
debug!("multihop address: {}", addr);
return Err((self, addr));
}
RelayAddr::Address { relay, dest } => {
if let Some(ref r) = relay {
let f = self.relay_via(r, &dest).map_err(|this| (this, addr))?;
Ok(Box::new(f.map_err(|e| io::Error::new(io::ErrorKind::Other, e))))
} else {
let f = self.relay_to(&dest).map_err(|this| (this, addr))?;
Ok(Box::new(f.map_err(|e| io::Error::new(io::ErrorKind::Other, e))))
}
}
}
}
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(a, b)
}
}
impl<T, P, S> RelayTransport<T, P>
where
T: Transport + Clone + 'static,
T::Dial: Send,
T::Output: AsyncRead + AsyncWrite + Send,
P: Deref<Target=S> + Clone + 'static,
for<'a> &'a S: Peerstore
{
/// Create a new relay transport.
///
/// This transport uses a static set of relays and will not attempt
/// any relay discovery.
pub fn new<R>(my_id: PeerId, transport: T, peers: P, relays: R) -> Self
where
R: IntoIterator<Item = PeerId>,
{
RelayTransport {
my_id,
transport,
peers,
relays: Arc::new(Vec::from_iter(relays)),
}
}
// Relay to destination over any available relay node.
fn relay_to(self, destination: &Peer) -> Result<impl Future<Item=T::Output, Error=RelayError<io::Error>>, Self> {
trace!("relay_to {:?}", destination.id);
let mut dials = Vec::new();
for relay in &*self.relays {
let relay_peer = Peer {
id: relay.clone(),
addrs: Vec::new(),
};
if let Ok(dial) = self.clone().relay_via(&relay_peer, destination) {
dials.push(dial)
}
}
if dials.is_empty() {
info!("no relay available for {:?}", destination.id);
return Err(self);
}
// Try one relay after another and stick to the first working one.
dials.shuffle(&mut thread_rng()); // randomise to spread load
let dest_peer = destination.id.clone();
let future = stream::iter_ok(dials.into_iter())
.and_then(|dial| dial)
.then(|result| Ok(result.ok()))
.filter_map(|result| result)
.into_future()
.map_err(|(err, _stream)| err)
.and_then(move |(ok, _stream)| {
if let Some(out) = ok {
Ok(out)
} else {
Err(RelayError::NoRelayFor(dest_peer))
}
});
Ok(future)
}
// Relay to destination via the given peer.
fn relay_via(self, relay: &Peer, destination: &Peer) -> Result<impl Future<Item=T::Output, Error=RelayError<io::Error>>, Self> {
trace!("relay_via {:?} to {:?}", relay.id, destination.id);
let mut addresses = Vec::new();
if relay.addrs.is_empty() {
// try all known relay addresses
if let Some(peer) = self.peers.peer(&relay.id) {
addresses.extend(peer.addrs())
}
} else {
// use only specific relay addresses
addresses.extend(relay.addrs.iter().cloned())
}
// no relay address => bail out
if addresses.is_empty() {
info!("no available address for relay: {:?}", relay.id);
return Err(self);
}
let relay = relay.clone();
let message = self.hop_message(destination);
let upgrade = protocol::Source(message);
let dialer = self.transport;
let future = stream::iter_ok(addresses.into_iter())
.filter_map(move |addr| dialer.clone().dial(addr).ok())
.and_then(move |dial| {
let upgrade = upgrade.clone();
dial.map_err(|_| RelayError::Message("could not dial"))
.and_then(move |c| apply_outbound(c, upgrade).from_err())
})
.then(|result| Ok(result.ok()))
.filter_map(|result| result)
.into_future()
.map_err(|(err, _stream)| err)
.and_then(move |(ok, _stream)| match ok {
Some(out) => {
debug!("connected");
Ok(out)
}
None => {
info!("failed to dial to {:?}", relay.id);
Err(RelayError::Message("failed to dial to relay"))
}
});
Ok(future)
}
fn hop_message(&self, destination: &Peer) -> CircuitRelay {
let mut msg = CircuitRelay::new();
msg.set_field_type(CircuitRelay_Type::HOP);
let mut from = CircuitRelay_Peer::new();
from.set_id(self.my_id.as_bytes().to_vec());
if let Some(me) = self.peers.peer(&self.my_id) {
for a in me.addrs() {
from.mut_addrs().push(a.to_bytes())
}
}
msg.set_srcPeer(from);
let mut dest = CircuitRelay_Peer::new();
dest.set_id(destination.id.as_bytes().to_vec());
for a in &destination.addrs {
dest.mut_addrs().push(a.to_bytes())
}
msg.set_dstPeer(dest);
msg
}
}

View File

@ -1,183 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type};
use futures::{future::{self, Either}, prelude::*};
use log::trace;
use multiaddr::{Protocol, Multiaddr};
use peerstore::PeerId;
use protobuf::{self, Message};
use std::{io, error::Error, iter::FromIterator};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec;
pub(crate) fn is_success(msg: &CircuitRelay) -> bool {
msg.get_field_type() == CircuitRelay_Type::STATUS
&& msg.get_code() == CircuitRelay_Status::SUCCESS
}
pub(crate) fn status(s: CircuitRelay_Status) -> CircuitRelay {
let mut msg = CircuitRelay::new();
msg.set_field_type(CircuitRelay_Type::STATUS);
msg.set_code(s);
msg
}
pub(crate) struct Io<T> {
codec: Framed<T, codec::UviBytes<Vec<u8>>>,
}
impl<T: AsyncRead + AsyncWrite> Io<T> {
pub(crate) fn new(c: T) -> Io<T> {
Io {
codec: Framed::new(c, codec::UviBytes::default()),
}
}
pub(crate) fn into(self) -> T {
self.codec.into_inner()
}
}
impl<T> Io<T>
where
T: AsyncRead + AsyncWrite + 'static,
{
pub(crate) fn send(self, msg: CircuitRelay) -> impl Future<Item=Self, Error=io::Error> {
trace!("sending protocol message: type={:?}, code={:?}",
msg.get_field_type(),
msg.get_code());
let pkg = match msg.write_to_bytes() {
Ok(p) => p,
Err(e) => return Either::A(future::err(io_err(e)))
};
Either::B(self.codec.send(pkg).map(|codec| Io { codec }))
}
pub(crate) fn recv(self) -> impl Future<Item=(Option<CircuitRelay>, Self), Error=io::Error> {
self.codec
.into_future()
.map_err(|(e, _)| io_err(e))
.and_then(|(pkg, codec)| {
if let Some(ref p) = pkg {
protobuf::parse_from_bytes(p)
.map(|msg: CircuitRelay| {
trace!("received protocol message: type={:?}, code={:?}",
msg.get_field_type(),
msg.get_code());
(Some(msg), Io { codec })
})
.map_err(io_err)
} else {
Ok((None, Io { codec }))
}
})
}
}
pub(crate) enum RelayAddr {
Address { relay: Option<Peer>, dest: Peer },
Malformed,
Multihop, // Unsupported
}
impl RelayAddr {
// Address format: [<relay>]/p2p-circuit/<destination>
pub(crate) fn parse(addr: &Multiaddr) -> RelayAddr {
let mut iter = addr.iter().peekable();
let relay = if let Some(&Protocol::P2pCircuit) = iter.peek() {
None // Address begins with "p2p-circuit", i.e. no relay is specified.
} else {
let prefix = iter.by_ref().take_while(|p| *p != Protocol::P2pCircuit);
match Peer::from(Multiaddr::from_iter(prefix)) {
None => return RelayAddr::Malformed,
peer => peer,
}
};
// After the (optional) relay, "p2p-circuit" is expected.
if Some(Protocol::P2pCircuit) != iter.next() {
return RelayAddr::Malformed;
}
let dest = {
let suffix = iter.by_ref().take_while(|p| *p != Protocol::P2pCircuit);
match Peer::from(Multiaddr::from_iter(suffix)) {
None => return RelayAddr::Malformed,
Some(p) => p,
}
};
if iter.next().is_some() {
return RelayAddr::Multihop;
}
RelayAddr::Address { relay, dest }
}
}
#[derive(Debug, Clone)]
pub(crate) struct Peer {
pub(crate) id: PeerId,
pub(crate) addrs: Vec<Multiaddr>,
}
impl Peer {
pub(crate) fn from(mut addr: Multiaddr) -> Option<Peer> {
match addr.pop() {
Some(Protocol::P2p(id)) => {
PeerId::from_multihash(id).ok().map(|pid| {
if addr.iter().count() == 0 {
Peer {
id: pid,
addrs: Vec::new(),
}
} else {
Peer {
id: pid,
addrs: vec![addr],
}
}
})
}
_ => None,
}
}
pub(crate) fn from_message(mut m: CircuitRelay_Peer) -> Option<Peer> {
let pid = PeerId::from_bytes(m.take_id()).ok()?;
let mut addrs = Vec::new();
for a in m.take_addrs().into_iter() {
if let Ok(ma) = Multiaddr::from_bytes(a) {
addrs.push(ma)
}
}
Some(Peer { id: pid, addrs })
}
}
pub(crate) fn io_err<E>(e: E) -> io::Error
where
E: Into<Box<Error + Send + Sync>>,
{
io::Error::new(io::ErrorKind::Other, e)
}