Merge remote-tracking branch 'upstream/master' into websockets

This commit is contained in:
Pierre Krieger 2018-01-10 18:10:57 +01:00
commit c60fb982d2
No known key found for this signature in database
GPG Key ID: A03CE3AD81F08F7C
26 changed files with 1549 additions and 253 deletions

View File

@ -1,5 +1,9 @@
[workspace] [workspace]
members = [ members = [
"multistream-select",
"datastore",
"example",
"libp2p-identify",
"libp2p-peerstore", "libp2p-peerstore",
"libp2p-ping", "libp2p-ping",
"libp2p-secio", "libp2p-secio",

View File

@ -14,6 +14,8 @@ Architecture of the crates of this repository:
- `datastore`: Utility library whose API provides a key-value storage with multiple possible - `datastore`: Utility library whose API provides a key-value storage with multiple possible
backends. Used by `peerstore`. backends. Used by `peerstore`.
- `example`: Example usages of this library. - `example`: Example usages of this library.
- `libp2p-identify`: Protocol implementation that allows a node A to query another node B what
information B knows about A. Implements the `ConnectionUpgrade` trait of `libp2p-swarm`.
- `libp2p-peerstore`: Generic storage for information about remote peers (their multiaddresses and - `libp2p-peerstore`: Generic storage for information about remote peers (their multiaddresses and
their public key), with multiple possible backends. Each multiaddress also has a time-to-live. their public key), with multiple possible backends. Each multiaddress also has a time-to-live.
Used by `libp2p-swarm`. Used by `libp2p-swarm`.
@ -29,3 +31,17 @@ Architecture of the crates of this repository:
upgrade. upgrade.
- `rw-stream-sink`: Utility library that makes it possible to wrap around a tokio `Stream + Sink` - `rw-stream-sink`: Utility library that makes it possible to wrap around a tokio `Stream + Sink`
of bytes and implements `AsyncRead + AsyncWrite`. of bytes and implements `AsyncRead + AsyncWrite`.
## About the `impl Trait` syntax
Right now a lot of code of this library uses `Box<Future>` or `Box<Stream>` objects, or forces
`'static` lifetime bounds.
This is caused by the lack of a stable `impl Trait` syntax in the Rust language. Once this syntax
is fully implemented and stabilized, it will be possible to change this code to use plain and
non-static objects instead of boxes.
Progress for the `impl Trait` syntax can be tracked in [this issue of the Rust repository](https://github.com/rust-lang/rust/issues/34511).
Once this syntax is stable in the nightly version, we will consider requiring the nightly version
of the compiler and switching to this syntax.

View File

@ -31,7 +31,7 @@ extern crate tokio_io;
use bytes::BytesMut; use bytes::BytesMut;
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use std::env; use std::env;
use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport}; use swarm::{UpgradeExt, SimpleProtocol, Transport, DeniedConnectionUpgrade};
use tcp::TcpConfig; use tcp::TcpConfig;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited; use tokio_io::codec::length_delimited;
@ -77,51 +77,51 @@ fn main() {
// a `Transport`. // a `Transport`.
.into_connection_reuse(); .into_connection_reuse();
let transport_with_echo = transport // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming
.clone() // connections for us. The second parameter we pass is the connection upgrade that is accepted
// On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol // by the listening part. We don't want to accept anything, so we pass a dummy object that
// just for this example. // represents a connection that is always denied.
// For this purpose, we create a `SimpleProtocol` struct. let (swarm_controller, swarm_future) = swarm::swarm(transport, DeniedConnectionUpgrade,
.with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { |_socket, _client_addr| -> Result<(), _> {
// This closure is called whenever a stream using the "echo" protocol has been unreachable!("All incoming connections should have been denied")
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead });
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::<_, BytesMut>::new(socket))
}));
// We now have a `transport` variable that can be used either to dial nodes or listen to // Building a struct that represents the protocol that we are going to use for dialing.
// incoming connections, and that will automatically apply all the selected protocols on top let proto = SimpleProtocol::new("/echo/1.0.0", |socket| {
// of any opened stream. // This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::<_, BytesMut>::new(socket))
});
// We use it to dial the address. // We now use the controller to dial to the address.
let dialer = transport_with_echo swarm_controller
.dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo| {
// If the multiaddr protocol exists but is not supported, then we get an error containing // `echo` is what the closure used when initializing `proto` returns.
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr")
.and_then(|echo| {
// `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type // Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method. // `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener"); println!("Sending \"hello world\" to listener");
echo.send("hello world".into()) echo.send("hello world".into())
}) // Then listening for one message from the remote.
.and_then(|echo| { .and_then(|echo| {
// The message has been successfully sent. Now wait for an answer. echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n)
echo.into_future()
.map(|(msg, rest)| {
println!("Received message from listener: {:?}", msg);
rest
}) })
.map_err(|(err, _)| err) .and_then(|message| {
}); println!("Received message from listener: {:?}", message.unwrap());
Ok(())
})
})
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the original multiaddress.
.expect("unsupported multiaddr");
// `dialer` is a future that contains all the behaviour that we want, but nothing has actually // The address we actually listen on can be different from the address that was passed to
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0`
// through the tokio core. // will be replaced with the actual port.
core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(()))))
.unwrap_or_else(|_| panic!()); // `swarm_future` is a future that contains all the behaviour that we want, but nothing has
// actually started yet. Because we created the `TcpConfig` with tokio, we need to run the
// future through the tokio core.
core.run(swarm_future).unwrap();
} }

View File

@ -74,80 +74,66 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for // `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`. // a `Transport`.
.into_connection_reuse() .into_connection_reuse();
// On top of both mutiplex and plaintext/secio, we use the "echo" protocol, which is a
// custom protocol just for this example.
// For this purpose, we create a `SimpleProtocol` struct.
.with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::new(socket))
}));
// We now have a `transport` variable that can be used either to dial nodes or listen to // We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply all the selected protocols on top // incoming connections, and that will automatically apply secio and multiplex on top
// of any opened stream. // of any opened stream.
// We use it to listen on the address. // We now prepare the protocol that we are going to negotiate with nodes that open a connection
let (listener, address) = transport // or substream to our server.
.listen_on(swarm::Multiaddr::new(&listen_addr).expect("invalid multiaddr")) let proto = SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket))
});
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and
// outgoing connections for us.
let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |socket, client_addr| {
println!("Successfully negotiated protocol with {}", client_addr);
// The type of `socket` is exactly what the closure of `SimpleProtocol` returns.
// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket
.into_future()
.map_err(|(e, _)| e)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
});
// We now use the controller to listen on the address.
let address = swarm_controller
.listen_on(listen_addr.parse().expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing // If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` // the original multiaddress.
// or `expect()`, but have to add a `map_err()` beforehand. .expect("unsupported multiaddr");
.map_err(|(_, addr)| addr).expect("unsupported multiaddr"); // The address we actually listen on can be different from the address that was passed to
// the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0`
// will be replaced with the actual port.
println!("Now listening on {:?}", address); println!("Now listening on {:?}", address);
let future = listener // `swarm_future` is a future that contains all the behaviour that we want, but nothing has
.for_each(|(socket, client_addr)| { // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the
// This closure is called whenever a new connection has been received. // future through the tokio core.
// `socket` is a future that will be triggered once the upgrade to secio, multiplex core.run(swarm_future).unwrap();
// and echo is complete.
let client_addr = client_addr.to_string();
println!("Incoming connection from {}", client_addr);
socket
.and_then(move |socket| {
println!("Successfully negotiated protocol with {}", client_addr);
// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
})
// We absorb errors from the future so that an error while processing a client
// (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
.then(move |res| {
if let Err(err) = res {
println!("Error while processing client: {:?}", err);
}
Ok(())
})
});
// `future` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(future).unwrap();
} }

View File

@ -0,0 +1,17 @@
[package]
name = "libp2p-identify"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
bytes = "0.4"
futures = "0.1"
libp2p-peerstore = { path = "../libp2p-peerstore" }
libp2p-swarm = { path = "../libp2p-swarm" }
multiaddr = "0.2.0"
protobuf = "1.4.2"
tokio-io = "0.1.0"
[dev-dependencies]
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }
tokio-core = "0.1.0"

View File

@ -0,0 +1,12 @@
#!/bin/sh
# This script regenerates the `src/structs_proto.rs` and `src/keys_proto.rs` files from
# `structs.proto` and `keys.proto`.
sudo docker run --rm -v `pwd`:/usr/code:z -w /usr/code rust /bin/bash -c " \
apt-get update; \
apt-get install -y protobuf-compiler; \
cargo install protobuf; \
protoc --rust_out . structs.proto"
mv -f structs.rs ./src/structs_proto.rs

225
libp2p-identify/src/lib.rs Normal file
View File

@ -0,0 +1,225 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the `/ipfs/id/1.0.0` protocol. Allows a node A to query another node B which
//! information B knows about A. Also includes the addresses B is listening on.
//!
//! When two nodes connect to each other, the listening half sends a message to the dialing half,
//! indicating the information, and then the protocol stops.
extern crate bytes;
extern crate futures;
extern crate multiaddr;
extern crate libp2p_peerstore;
extern crate libp2p_swarm;
extern crate protobuf;
extern crate tokio_io;
use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, Sink};
use libp2p_swarm::{ConnectionUpgrade, Endpoint};
use multiaddr::Multiaddr;
use protobuf::Message as ProtobufMessage;
use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes;
use protobuf::repeated::RepeatedField;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
mod structs_proto;
/// Prototype for an upgrade to the identity protocol.
#[derive(Debug, Clone)]
pub struct IdentifyProtocol {
pub information: IdentifyInfo,
}
impl IdentifyProtocol {
/// Builds a new `IdentifyProtocol`.
#[inline]
pub fn new(information: IdentifyInfo) -> IdentifyProtocol {
IdentifyProtocol {
information
}
}
}
/// Information sent from the listener to the dialer.
#[derive(Debug, Clone)]
pub struct IdentifyInfo {
/// Public key of the node.
pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0`.
pub protocol_version: String,
/// Name and version. Can be thought as similar to the `User-Agent` header of HTTP.
pub agent_version: String,
/// Addresses that are listened on.
pub listen_addrs: Vec<Multiaddr>,
/// Address that the server uses to communicate with the dialer.
pub observed_addr: Multiaddr,
/// Protocols supported by the remote.
pub protocols: Vec<String>,
}
impl<C> ConnectionUpgrade<C> for IdentifyProtocol
where C: AsyncRead + AsyncWrite + 'static
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = Option<IdentifyInfo>;
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
}
fn upgrade(self, socket: C, _: (), ty: Endpoint) -> Self::Future {
// TODO: use jack's varint library instead
let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket);
match ty {
Endpoint::Dialer => {
let future = socket.into_future()
.map(|(msg, _)| msg)
.map_err(|(err, _)| err)
.and_then(|msg| if let Some(msg) = msg {
Ok(Some(parse_proto_msg(msg)?))
} else {
Ok(None)
});
Box::new(future) as Box<_>
}
Endpoint::Listener => {
let info = self.information;
let listen_addrs = info.listen_addrs
.into_iter()
.map(|addr| addr.to_string().into_bytes())
.collect();
let mut message = structs_proto::Identify::new();
message.set_agentVersion(info.agent_version);
message.set_protocolVersion(info.protocol_version);
message.set_publicKey(info.public_key);
message.set_listenAddrs(listen_addrs);
message.set_observedAddr(info.observed_addr.to_string().into_bytes());
message.set_protocols(RepeatedField::from_vec(info.protocols));
let bytes = message.write_to_bytes()
.expect("writing protobuf failed ; should never happen");
// On the server side, after sending the information to the client we make the
// future produce a `None`. If we were on the client side, this would contain the
// information received by the server.
let future = socket.send(bytes).map(|_| None);
Box::new(future) as Box<_>
}
}
}
}
// Turns a protobuf message into an `IdentifyInfo`. If something bad happens, turn it into
// an `IoError`.
fn parse_proto_msg(msg: BytesMut) -> Result<IdentifyInfo, IoError> {
match protobuf_parse_from_bytes::<structs_proto::Identify>(&msg) {
Ok(mut msg) => {
let listen_addrs = {
let mut addrs = Vec::new();
for addr in msg.take_listenAddrs().into_iter() {
addrs.push(bytes_to_multiaddr(addr)?);
}
addrs
};
let observed_addr = bytes_to_multiaddr(msg.take_observedAddr())?;
Ok(IdentifyInfo {
public_key: msg.take_publicKey(),
protocol_version: msg.take_protocolVersion(),
agent_version: msg.take_agentVersion(),
listen_addrs: listen_addrs,
observed_addr: observed_addr,
protocols: msg.take_protocols().into_vec(),
})
}
Err(err) => {
Err(IoError::new(IoErrorKind::InvalidData, err))
}
}
}
// Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into an `IoError`.
fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, IoError> {
String::from_utf8(bytes)
.map_err(|err| {
IoError::new(IoErrorKind::InvalidData, err)
})
.and_then(|s| {
s.parse()
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
})
}
#[cfg(test)]
mod tests {
extern crate libp2p_tcp_transport;
extern crate tokio_core;
use self::libp2p_tcp_transport::TcpConfig;
use self::tokio_core::reactor::Core;
use IdentifyInfo;
use IdentifyProtocol;
use futures::{IntoFuture, Future, Stream};
use libp2p_swarm::Transport;
#[test]
fn basic() {
let mut core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let with_proto = tcp.with_upgrade(IdentifyProtocol::new(IdentifyInfo {
public_key: vec![1, 2, 3, 4],
protocol_version: "ipfs/1.0.0".to_owned(),
agent_version: "agent/version".to_owned(),
listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()],
observed_addr: "/ip4/1.2.3.4/tcp/9876".parse().unwrap(),
protocols: vec!["ping".to_owned(), "kad".to_owned()],
}));
let (server, addr) = with_proto.clone()
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let server = server.into_future()
.map_err(|(err, _)| err)
.and_then(|(n, _)| n.unwrap().0);
let dialer = with_proto.dial(addr)
.unwrap()
.into_future();
let (recv, should_be_empty) = core.run(dialer.join(server)).unwrap();
assert!(should_be_empty.is_none());
let recv = recv.unwrap();
assert_eq!(recv.public_key, &[1, 2, 3, 4]);
}
}

View File

@ -0,0 +1,550 @@
// This file is generated. Do not edit
// @generated
// https://github.com/Manishearth/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy)]
#![cfg_attr(rustfmt, rustfmt_skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unsafe_code)]
#![allow(unused_imports)]
#![allow(unused_results)]
use protobuf::Message as Message_imported_for_functions;
use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
#[derive(PartialEq,Clone,Default)]
pub struct Identify {
// message fields
protocolVersion: ::protobuf::SingularField<::std::string::String>,
agentVersion: ::protobuf::SingularField<::std::string::String>,
publicKey: ::protobuf::SingularField<::std::vec::Vec<u8>>,
listenAddrs: ::protobuf::RepeatedField<::std::vec::Vec<u8>>,
observedAddr: ::protobuf::SingularField<::std::vec::Vec<u8>>,
protocols: ::protobuf::RepeatedField<::std::string::String>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
}
// see codegen.rs for the explanation why impl Sync explicitly
unsafe impl ::std::marker::Sync for Identify {}
impl Identify {
pub fn new() -> Identify {
::std::default::Default::default()
}
pub fn default_instance() -> &'static Identify {
static mut instance: ::protobuf::lazy::Lazy<Identify> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const Identify,
};
unsafe {
instance.get(Identify::new)
}
}
// optional string protocolVersion = 5;
pub fn clear_protocolVersion(&mut self) {
self.protocolVersion.clear();
}
pub fn has_protocolVersion(&self) -> bool {
self.protocolVersion.is_some()
}
// Param is passed by value, moved
pub fn set_protocolVersion(&mut self, v: ::std::string::String) {
self.protocolVersion = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_protocolVersion(&mut self) -> &mut ::std::string::String {
if self.protocolVersion.is_none() {
self.protocolVersion.set_default();
}
self.protocolVersion.as_mut().unwrap()
}
// Take field
pub fn take_protocolVersion(&mut self) -> ::std::string::String {
self.protocolVersion.take().unwrap_or_else(|| ::std::string::String::new())
}
pub fn get_protocolVersion(&self) -> &str {
match self.protocolVersion.as_ref() {
Some(v) => &v,
None => "",
}
}
fn get_protocolVersion_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> {
&self.protocolVersion
}
fn mut_protocolVersion_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> {
&mut self.protocolVersion
}
// optional string agentVersion = 6;
pub fn clear_agentVersion(&mut self) {
self.agentVersion.clear();
}
pub fn has_agentVersion(&self) -> bool {
self.agentVersion.is_some()
}
// Param is passed by value, moved
pub fn set_agentVersion(&mut self, v: ::std::string::String) {
self.agentVersion = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_agentVersion(&mut self) -> &mut ::std::string::String {
if self.agentVersion.is_none() {
self.agentVersion.set_default();
}
self.agentVersion.as_mut().unwrap()
}
// Take field
pub fn take_agentVersion(&mut self) -> ::std::string::String {
self.agentVersion.take().unwrap_or_else(|| ::std::string::String::new())
}
pub fn get_agentVersion(&self) -> &str {
match self.agentVersion.as_ref() {
Some(v) => &v,
None => "",
}
}
fn get_agentVersion_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> {
&self.agentVersion
}
fn mut_agentVersion_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> {
&mut self.agentVersion
}
// optional bytes publicKey = 1;
pub fn clear_publicKey(&mut self) {
self.publicKey.clear();
}
pub fn has_publicKey(&self) -> bool {
self.publicKey.is_some()
}
// Param is passed by value, moved
pub fn set_publicKey(&mut self, v: ::std::vec::Vec<u8>) {
self.publicKey = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_publicKey(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.publicKey.is_none() {
self.publicKey.set_default();
}
self.publicKey.as_mut().unwrap()
}
// Take field
pub fn take_publicKey(&mut self) -> ::std::vec::Vec<u8> {
self.publicKey.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_publicKey(&self) -> &[u8] {
match self.publicKey.as_ref() {
Some(v) => &v,
None => &[],
}
}
fn get_publicKey_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec<u8>> {
&self.publicKey
}
fn mut_publicKey_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec<u8>> {
&mut self.publicKey
}
// repeated bytes listenAddrs = 2;
pub fn clear_listenAddrs(&mut self) {
self.listenAddrs.clear();
}
// Param is passed by value, moved
pub fn set_listenAddrs(&mut self, v: ::protobuf::RepeatedField<::std::vec::Vec<u8>>) {
self.listenAddrs = v;
}
// Mutable pointer to the field.
pub fn mut_listenAddrs(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
&mut self.listenAddrs
}
// Take field
pub fn take_listenAddrs(&mut self) -> ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
::std::mem::replace(&mut self.listenAddrs, ::protobuf::RepeatedField::new())
}
pub fn get_listenAddrs(&self) -> &[::std::vec::Vec<u8>] {
&self.listenAddrs
}
fn get_listenAddrs_for_reflect(&self) -> &::protobuf::RepeatedField<::std::vec::Vec<u8>> {
&self.listenAddrs
}
fn mut_listenAddrs_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
&mut self.listenAddrs
}
// optional bytes observedAddr = 4;
pub fn clear_observedAddr(&mut self) {
self.observedAddr.clear();
}
pub fn has_observedAddr(&self) -> bool {
self.observedAddr.is_some()
}
// Param is passed by value, moved
pub fn set_observedAddr(&mut self, v: ::std::vec::Vec<u8>) {
self.observedAddr = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_observedAddr(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.observedAddr.is_none() {
self.observedAddr.set_default();
}
self.observedAddr.as_mut().unwrap()
}
// Take field
pub fn take_observedAddr(&mut self) -> ::std::vec::Vec<u8> {
self.observedAddr.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_observedAddr(&self) -> &[u8] {
match self.observedAddr.as_ref() {
Some(v) => &v,
None => &[],
}
}
fn get_observedAddr_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec<u8>> {
&self.observedAddr
}
fn mut_observedAddr_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec<u8>> {
&mut self.observedAddr
}
// repeated string protocols = 3;
pub fn clear_protocols(&mut self) {
self.protocols.clear();
}
// Param is passed by value, moved
pub fn set_protocols(&mut self, v: ::protobuf::RepeatedField<::std::string::String>) {
self.protocols = v;
}
// Mutable pointer to the field.
pub fn mut_protocols(&mut self) -> &mut ::protobuf::RepeatedField<::std::string::String> {
&mut self.protocols
}
// Take field
pub fn take_protocols(&mut self) -> ::protobuf::RepeatedField<::std::string::String> {
::std::mem::replace(&mut self.protocols, ::protobuf::RepeatedField::new())
}
pub fn get_protocols(&self) -> &[::std::string::String] {
&self.protocols
}
fn get_protocols_for_reflect(&self) -> &::protobuf::RepeatedField<::std::string::String> {
&self.protocols
}
fn mut_protocols_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField<::std::string::String> {
&mut self.protocols
}
}
impl ::protobuf::Message for Identify {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
5 => {
::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.protocolVersion)?;
},
6 => {
::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.agentVersion)?;
},
1 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.publicKey)?;
},
2 => {
::protobuf::rt::read_repeated_bytes_into(wire_type, is, &mut self.listenAddrs)?;
},
4 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.observedAddr)?;
},
3 => {
::protobuf::rt::read_repeated_string_into(wire_type, is, &mut self.protocols)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if let Some(ref v) = self.protocolVersion.as_ref() {
my_size += ::protobuf::rt::string_size(5, &v);
}
if let Some(ref v) = self.agentVersion.as_ref() {
my_size += ::protobuf::rt::string_size(6, &v);
}
if let Some(ref v) = self.publicKey.as_ref() {
my_size += ::protobuf::rt::bytes_size(1, &v);
}
for value in &self.listenAddrs {
my_size += ::protobuf::rt::bytes_size(2, &value);
};
if let Some(ref v) = self.observedAddr.as_ref() {
my_size += ::protobuf::rt::bytes_size(4, &v);
}
for value in &self.protocols {
my_size += ::protobuf::rt::string_size(3, &value);
};
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> {
if let Some(ref v) = self.protocolVersion.as_ref() {
os.write_string(5, &v)?;
}
if let Some(ref v) = self.agentVersion.as_ref() {
os.write_string(6, &v)?;
}
if let Some(ref v) = self.publicKey.as_ref() {
os.write_bytes(1, &v)?;
}
for v in &self.listenAddrs {
os.write_bytes(2, &v)?;
};
if let Some(ref v) = self.observedAddr.as_ref() {
os.write_bytes(4, &v)?;
}
for v in &self.protocols {
os.write_string(3, &v)?;
};
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &::std::any::Any {
self as &::std::any::Any
}
fn as_any_mut(&mut self) -> &mut ::std::any::Any {
self as &mut ::std::any::Any
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<::std::any::Any> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
::protobuf::MessageStatic::descriptor_static(None::<Self>)
}
}
impl ::protobuf::MessageStatic for Identify {
fn new() -> Identify {
Identify::new()
}
fn descriptor_static(_: ::std::option::Option<Identify>) -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"protocolVersion",
Identify::get_protocolVersion_for_reflect,
Identify::mut_protocolVersion_for_reflect,
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"agentVersion",
Identify::get_agentVersion_for_reflect,
Identify::mut_agentVersion_for_reflect,
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"publicKey",
Identify::get_publicKey_for_reflect,
Identify::mut_publicKey_for_reflect,
));
fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"listenAddrs",
Identify::get_listenAddrs_for_reflect,
Identify::mut_listenAddrs_for_reflect,
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"observedAddr",
Identify::get_observedAddr_for_reflect,
Identify::mut_observedAddr_for_reflect,
));
fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"protocols",
Identify::get_protocols_for_reflect,
Identify::mut_protocols_for_reflect,
));
::protobuf::reflect::MessageDescriptor::new::<Identify>(
"Identify",
fields,
file_descriptor_proto()
)
})
}
}
}
impl ::protobuf::Clear for Identify {
fn clear(&mut self) {
self.clear_protocolVersion();
self.clear_agentVersion();
self.clear_publicKey();
self.clear_listenAddrs();
self.clear_observedAddr();
self.clear_protocols();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for Identify {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for Identify {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\rstructs.proto\"\xda\x01\n\x08Identify\x12(\n\x0fprotocolVersion\x18\
\x05\x20\x01(\tR\x0fprotocolVersion\x12\"\n\x0cagentVersion\x18\x06\x20\
\x01(\tR\x0cagentVersion\x12\x1c\n\tpublicKey\x18\x01\x20\x01(\x0cR\tpub\
licKey\x12\x20\n\x0blistenAddrs\x18\x02\x20\x03(\x0cR\x0blistenAddrs\x12\
\"\n\x0cobservedAddr\x18\x04\x20\x01(\x0cR\x0cobservedAddr\x12\x1c\n\tpr\
otocols\x18\x03\x20\x03(\tR\tprotocolsJ\xc2\t\n\x06\x12\x04\0\0\x16\x01\
\n\n\n\x02\x04\0\x12\x04\0\0\x16\x01\n\n\n\x03\x04\0\x01\x12\x03\0\x08\
\x10\nX\n\x04\x04\0\x02\0\x12\x03\x02\x02&\x1a8\x20protocolVersion\x20de\
termines\x20compatibility\x20between\x20peers\n\"\x11\x20e.g.\x20ipfs/1.\
0.0\n\n\x0c\n\x05\x04\0\x02\0\x04\x12\x03\x02\x02\n\n\x0c\n\x05\x04\0\
\x02\0\x05\x12\x03\x02\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x02\
\x12!\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02$%\n\x9f\x01\n\x04\x04\0\
\x02\x01\x12\x03\x06\x02#\x1a|\x20agentVersion\x20is\x20like\x20a\x20Use\
rAgent\x20string\x20in\x20browsers,\x20or\x20client\x20version\x20in\x20\
bittorrent\n\x20includes\x20the\x20client\x20name\x20and\x20client.\n\"\
\x14\x20e.g.\x20go-ipfs/0.1.0\n\n\x0c\n\x05\x04\0\x02\x01\x04\x12\x03\
\x06\x02\n\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x06\x0b\x11\n\x0c\n\x05\
\x04\0\x02\x01\x01\x12\x03\x06\x12\x1e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\
\x03\x06!\"\n\xe3\x01\n\x04\x04\0\x02\x02\x12\x03\x0b\x02\x1f\x1a\xd5\
\x01\x20publicKey\x20is\x20this\x20node's\x20public\x20key\x20(which\x20\
also\x20gives\x20its\x20node.ID)\n\x20-\x20may\x20not\x20need\x20to\x20b\
e\x20sent,\x20as\x20secure\x20channel\x20implies\x20it\x20has\x20been\
\x20sent.\n\x20-\x20then\x20again,\x20if\x20we\x20change\x20/\x20disable\
\x20secure\x20channel,\x20may\x20still\x20want\x20it.\n\n\x0c\n\x05\x04\
\0\x02\x02\x04\x12\x03\x0b\x02\n\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\
\x0b\x0b\x10\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x0b\x11\x1a\n\x0c\n\
\x05\x04\0\x02\x02\x03\x12\x03\x0b\x1d\x1e\n]\n\x04\x04\0\x02\x03\x12\
\x03\x0e\x02!\x1aP\x20listenAddrs\x20are\x20the\x20multiaddrs\x20the\x20\
sender\x20node\x20listens\x20for\x20open\x20connections\x20on\n\n\x0c\n\
\x05\x04\0\x02\x03\x04\x12\x03\x0e\x02\n\n\x0c\n\x05\x04\0\x02\x03\x05\
\x12\x03\x0e\x0b\x10\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x0e\x11\x1c\n\
\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x0e\x1f\x20\n\x81\x02\n\x04\x04\0\
\x02\x04\x12\x03\x13\x02\"\x1a\xf3\x01\x20oservedAddr\x20is\x20the\x20mu\
ltiaddr\x20of\x20the\x20remote\x20endpoint\x20that\x20the\x20sender\x20n\
ode\x20perceives\n\x20this\x20is\x20useful\x20information\x20to\x20conve\
y\x20to\x20the\x20other\x20side,\x20as\x20it\x20helps\x20the\x20remote\
\x20endpoint\n\x20determine\x20whether\x20its\x20connection\x20to\x20the\
\x20local\x20peer\x20goes\x20through\x20NAT.\n\n\x0c\n\x05\x04\0\x02\x04\
\x04\x12\x03\x13\x02\n\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x13\x0b\x10\
\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x13\x11\x1d\n\x0c\n\x05\x04\0\x02\
\x04\x03\x12\x03\x13\x20!\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x15\x02\x20\
\n\x0c\n\x05\x04\0\x02\x05\x04\x12\x03\x15\x02\n\n\x0c\n\x05\x04\0\x02\
\x05\x05\x12\x03\x15\x0b\x11\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x15\
\x12\x1b\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x15\x1e\x1f\
";
static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto,
};
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
unsafe {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}
}

View File

@ -0,0 +1,23 @@
message Identify {
// protocolVersion determines compatibility between peers
optional string protocolVersion = 5; // e.g. ipfs/1.0.0
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
// includes the client name and client.
optional string agentVersion = 6; // e.g. go-ipfs/0.1.0
// publicKey is this node's public key (which also gives its node.ID)
// - may not need to be sent, as secure channel implies it has been sent.
// - then again, if we change / disable secure channel, may still want it.
optional bytes publicKey = 1;
// listenAddrs are the multiaddrs the sender node listens for open connections on
repeated bytes listenAddrs = 2;
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
// this is useful information to convey to the other side, as it helps the remote endpoint
// determine whether its connection to the local peer goes through NAT.
optional bytes observedAddr = 4;
repeated string protocols = 3;
}

View File

@ -34,7 +34,7 @@ let peer_id = vec![1, 2, 3, 4];
// `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope. // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope.
let mut peer = peerstore.peer_or_create(&peer_id); let mut peer = peerstore.peer_or_create(&peer_id);
peer.set_pub_key(vec![60, 90, 120, 150]); peer.set_pub_key(vec![60, 90, 120, 150]);
peer.add_addr(Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap(), peer.add_addr("/ip4/10.11.12.13/tcp/20000".parse::<Multiaddr>().unwrap(),
Duration::from_millis(5000)); Duration::from_millis(5000));
} }
@ -43,6 +43,6 @@ let peer_id = vec![1, 2, 3, 4];
let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore"); let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore");
assert_eq!(peer.get_pub_key().unwrap(), &[60, 90, 120, 150]); assert_eq!(peer.get_pub_key().unwrap(), &[60, 90, 120, 150]);
assert_eq!(peer.addrs().collect::<Vec<_>>(), assert_eq!(peer.addrs().collect::<Vec<_>>(),
&[Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap()]); &["/ip4/10.11.12.13/tcp/20000".parse::<Multiaddr>().unwrap()]);
} }
``` ```

View File

@ -55,7 +55,7 @@
//! // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope. //! // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope.
//! let mut peer = peerstore.peer_or_create(&peer_id); //! let mut peer = peerstore.peer_or_create(&peer_id);
//! peer.set_pub_key(vec![60, 90, 120, 150]); //! peer.set_pub_key(vec![60, 90, 120, 150]);
//! peer.add_addr(Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap(), //! peer.add_addr("/ip4/10.11.12.13/tcp/20000".parse::<Multiaddr>().unwrap(),
//! Duration::from_millis(5000)); //! Duration::from_millis(5000));
//! } //! }
//! //!
@ -64,7 +64,7 @@
//! let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore"); //! let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore");
//! assert_eq!(peer.get_pub_key().unwrap(), &[60, 90, 120, 150]); //! assert_eq!(peer.get_pub_key().unwrap(), &[60, 90, 120, 150]);
//! assert_eq!(peer.addrs().collect::<Vec<_>>(), //! assert_eq!(peer.addrs().collect::<Vec<_>>(),
//! &[Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap()]); //! &["/ip4/10.11.12.13/tcp/20000".parse::<Multiaddr>().unwrap()]);
//! } //! }
//! # } //! # }
//! ``` //! ```

View File

@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for PeerInfo {
let addrs = { let addrs = {
let mut out = Vec::with_capacity(interm.addrs.len()); let mut out = Vec::with_capacity(interm.addrs.len());
for (addr, since_epoch) in interm.addrs { for (addr, since_epoch) in interm.addrs {
let addr = match Multiaddr::new(&addr) { let addr = match addr.parse::<Multiaddr>() {
Ok(a) => a, Ok(a) => a,
Err(err) => return Err(DeserializerError::custom(err)), Err(err) => return Err(DeserializerError::custom(err)),
}; };

View File

@ -62,7 +62,7 @@ macro_rules! peerstore_tests {
$($stmt;)* $($stmt;)*
let peer_store = $create_peerstore; let peer_store = $create_peerstore;
let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap();
let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000));
@ -76,7 +76,7 @@ macro_rules! peerstore_tests {
$($stmt;)* $($stmt;)*
let peer_store = $create_peerstore; let peer_store = $create_peerstore;
let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap();
let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(0)); peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(0));
thread::sleep(Duration::from_millis(2)); thread::sleep(Duration::from_millis(2));
@ -90,7 +90,7 @@ macro_rules! peerstore_tests {
$($stmt;)* $($stmt;)*
let peer_store = $create_peerstore; let peer_store = $create_peerstore;
let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap();
let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); let addr = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000));
peer_store.peer(&peer_id).unwrap().clear_addrs(); peer_store.peer(&peer_id).unwrap().clear_addrs();
@ -105,8 +105,8 @@ macro_rules! peerstore_tests {
let peer_store = $create_peerstore; let peer_store = $create_peerstore;
let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap();
let addr1 = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); let addr1 = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
let addr2 = Multiaddr::new("/ip4/0.0.0.1/tcp/0").unwrap(); let addr2 = "/ip4/0.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000));
peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000));
@ -124,8 +124,8 @@ macro_rules! peerstore_tests {
let peer_store = $create_peerstore; let peer_store = $create_peerstore;
let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap();
let addr1 = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); let addr1 = "/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap();
let addr2 = Multiaddr::new("/ip4/0.0.0.1/tcp/0").unwrap(); let addr2 = "/ip4/0.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000));
peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000));

View File

@ -43,7 +43,7 @@ let mut core = tokio_core::reactor::Core::new().unwrap();
let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_upgrade(Ping) .with_upgrade(Ping)
.dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
}); });

View File

@ -67,7 +67,7 @@
//! //!
//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) //! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
//! .with_upgrade(Ping) //! .with_upgrade(Ping)
//! .dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) //! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
//! .and_then(|(mut pinger, service)| { //! .and_then(|(mut pinger, service)| {
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) //! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
//! }); //! });

View File

@ -37,7 +37,7 @@ let transport = TcpConfig::new(core.handle())
} }
}); });
let future = transport.dial(Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()) let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
.unwrap_or_else(|_| panic!("Unable to dial node")) .unwrap_or_else(|_| panic!("Unable to dial node"))
.and_then(|connection| { .and_then(|connection| {
// Sends "hello world" on the connection, will be encrypted. // Sends "hello world" on the connection, will be encrypted.

View File

@ -58,7 +58,7 @@
//! } //! }
//! }); //! });
//! //!
//! let future = transport.dial(Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()) //! let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
//! .unwrap_or_else(|_| panic!("Unable to dial node")) //! .unwrap_or_else(|_| panic!("Unable to dial node"))
//! .and_then(|connection| { //! .and_then(|connection| {
//! // Sends "hello world" on the connection, will be encrypted. //! // Sends "hello world" on the connection, will be encrypted.

View File

@ -1,8 +1,9 @@
# libp2p-swarm # libp2p-swarm
Transport and protocol upgrade system of *libp2p*. Transport, protocol upgrade and swarm systems of *libp2p*.
This crate contains all the core traits and mechanisms of the transport system of *libp2p*. This crate contains all the core traits and mechanisms of the transport and swarm systems
of *libp2p*.
# The `Transport` trait # The `Transport` trait
@ -27,11 +28,12 @@ multiple times in a row in order to chain as many implementations as you want.
The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
transports that can receive incoming connections on streams that have been opened with `dial()`. transports that can receive incoming connections on streams that have been opened with `dial()`.
The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of The trait provides the `next_incoming()` method, which returns a future that will resolve to
incoming connections. the next substream that arrives from a dialed node.
> **Note**: This trait is mainly implemented for transports that provide stream muxing > **Note**: This trait is mainly implemented for transports that provide stream muxing
> capabilities. > capabilities, but it can also be implemented in a dummy way by returning an empty
> iterator.
# Connection upgrades # Connection upgrades
@ -57,7 +59,7 @@ A middleware can be applied on a transport by using the `with_upgrade` method of
`Transport` trait. The return value of this method also implements the `Transport` trait, which `Transport` trait. The return value of this method also implements the `Transport` trait, which
means that you can call `dial()` and `listen_on()` on it in order to directly obtain an means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
upgraded connection or a listener that will yield upgraded connections. Similarly, the upgraded connection or a listener that will yield upgraded connections. Similarly, the
`dial_and_listen()` method will automatically apply the upgrade on both the dialer and the `next_incoming()` method will automatically apply the upgrade on both the dialer and the
listener. An error is produced if the remote doesn't support the protocol corresponding to the listener. An error is produced if the remote doesn't support the protocol corresponding to the
connection upgrade. connection upgrade.
@ -100,11 +102,11 @@ implement the `AsyncRead` and `AsyncWrite` traits. This means that that the retu
transport. transport.
However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
`dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`,
which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
way to use the protocol. way to use the protocol.
```no_run ```rust
extern crate futures; extern crate futures;
extern crate libp2p_ping; extern crate libp2p_ping;
extern crate libp2p_swarm; extern crate libp2p_swarm;
@ -115,7 +117,6 @@ use futures::Future;
use libp2p_ping::Ping; use libp2p_ping::Ping;
use libp2p_swarm::Transport; use libp2p_swarm::Transport;
# fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap(); let mut core = tokio_core::reactor::Core::new().unwrap();
let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
@ -123,14 +124,13 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_upgrade(Ping) .with_upgrade(Ping)
// TODO: right now the only available protocol is ping, but we want to replace it with // TODO: right now the only available protocol is ping, but we want to replace it with
// something that is more simple to use // something that is more simple to use
.dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
}); });
// Runs until the ping arrives. // Runs until the ping arrives.
core.run(ping_finished_future).unwrap(); core.run(ping_finished_future).unwrap();
# }
``` ```
## Grouping protocols ## Grouping protocols
@ -138,3 +138,39 @@ core.run(ping_finished_future).unwrap();
You can use the `.or_upgrade()` method to group multiple upgrades together. The return value You can use the `.or_upgrade()` method to group multiple upgrades together. The return value
also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the
ones supported. ones supported.
# Swarm
Once you have created an object that implements the `Transport` trait, you can put it in a
*swarm*. This is done by calling the `swarm()` freestanding function with the transport
alongside with a function or a closure that will turn the output of the upgrade (usually an
actual protocol, as explained above) into a `Future` producing `()`.
```rust
extern crate futures;
extern crate libp2p_ping;
extern crate libp2p_swarm;
extern crate libp2p_tcp_transport;
extern crate tokio_core;
use futures::Future;
use libp2p_ping::Ping;
use libp2p_swarm::Transport;
let mut core = tokio_core::reactor::Core::new().unwrap();
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_dummy_muxing();
let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| {
pinger.ping().map_err(|_| panic!())
.select(service).map_err(|_| panic!())
.map(|_| ())
});
// The `swarm_controller` can then be used to do some operations.
swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
// Runs until everything is finished.
core.run(swarm_future).unwrap();
```

View File

@ -21,9 +21,10 @@
// TODO: use this once stable ; for now we just copy-paste the content of the README.md // TODO: use this once stable ; for now we just copy-paste the content of the README.md
//#![doc(include = "../README.md")] //#![doc(include = "../README.md")]
//! Transport and protocol upgrade system of *libp2p*. //! Transport, protocol upgrade and swarm systems of *libp2p*.
//! //!
//! This crate contains all the core traits and mechanisms of the transport system of *libp2p*. //! This crate contains all the core traits and mechanisms of the transport and swarm systems
//! of *libp2p*.
//! //!
//! # The `Transport` trait //! # The `Transport` trait
//! //!
@ -48,11 +49,12 @@
//! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on //! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
//! transports that can receive incoming connections on streams that have been opened with `dial()`. //! transports that can receive incoming connections on streams that have been opened with `dial()`.
//! //!
//! The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of //! The trait provides the `next_incoming()` method, which returns a future that will resolve to
//! incoming connections. //! the next substream that arrives from a dialed node.
//! //!
//! > **Note**: This trait is mainly implemented for transports that provide stream muxing //! > **Note**: This trait is mainly implemented for transports that provide stream muxing
//! > capabilities. //! > capabilities, but it can also be implemented in a dummy way by returning an empty
//! > iterator.
//! //!
//! # Connection upgrades //! # Connection upgrades
//! //!
@ -78,7 +80,7 @@
//! `Transport` trait. The return value of this method also implements the `Transport` trait, which //! `Transport` trait. The return value of this method also implements the `Transport` trait, which
//! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an //! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
//! upgraded connection or a listener that will yield upgraded connections. Similarly, the //! upgraded connection or a listener that will yield upgraded connections. Similarly, the
//! `dial_and_listen()` method will automatically apply the upgrade on both the dialer and the //! `next_incoming()` method will automatically apply the upgrade on both the dialer and the
//! listener. An error is produced if the remote doesn't support the protocol corresponding to the //! listener. An error is produced if the remote doesn't support the protocol corresponding to the
//! connection upgrade. //! connection upgrade.
//! //!
@ -123,7 +125,7 @@
//! transport. //! transport.
//! //!
//! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named //! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
//! `dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, //! `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`,
//! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific //! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
//! way to use the protocol. //! way to use the protocol.
//! //!
@ -146,7 +148,7 @@
//! .with_upgrade(Ping) //! .with_upgrade(Ping)
//! // TODO: right now the only available protocol is ping, but we want to replace it with //! // TODO: right now the only available protocol is ping, but we want to replace it with
//! // something that is more simple to use //! // something that is more simple to use
//! .dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) //! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
//! .and_then(|(mut pinger, service)| { //! .and_then(|(mut pinger, service)| {
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) //! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
//! }); //! });
@ -162,6 +164,43 @@
//! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the //! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the
//! ones supported. //! ones supported.
//! //!
//! # Swarm
//!
//! Once you have created an object that implements the `Transport` trait, you can put it in a
//! *swarm*. This is done by calling the `swarm()` freestanding function with the transport
//! alongside with a function or a closure that will turn the output of the upgrade (usually an
//! actual protocol, as explained above) into a `Future` producing `()`.
//!
//! ```no_run
//! extern crate futures;
//! extern crate libp2p_ping;
//! extern crate libp2p_swarm;
//! extern crate libp2p_tcp_transport;
//! extern crate tokio_core;
//!
//! use futures::Future;
//! use libp2p_ping::Ping;
//! use libp2p_swarm::Transport;
//!
//! # fn main() {
//! let mut core = tokio_core::reactor::Core::new().unwrap();
//!
//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
//! .with_dummy_muxing();
//!
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| {
//! pinger.ping().map_err(|_| panic!())
//! .select(service).map_err(|_| panic!())
//! .map(|_| ())
//! });
//!
//! // The `swarm_controller` can then be used to do some operations.
//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
//!
//! // Runs until everything is finished.
//! core.run(swarm_future).unwrap();
//! # }
//! ```
extern crate bytes; extern crate bytes;
#[macro_use] #[macro_use]
@ -175,11 +214,14 @@ extern crate tokio_io;
pub extern crate multiaddr; pub extern crate multiaddr;
mod connection_reuse; mod connection_reuse;
pub mod swarm;
pub mod muxing; pub mod muxing;
pub mod transport; pub mod transport;
pub use self::connection_reuse::ConnectionReuse; pub use self::connection_reuse::ConnectionReuse;
pub use self::multiaddr::Multiaddr; pub use self::multiaddr::Multiaddr;
pub use self::muxing::StreamMuxer; pub use self::muxing::StreamMuxer;
pub use self::swarm::{swarm, SwarmController, SwarmFuture};
pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade};
pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt}; pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt};
pub use self::transport::{DeniedConnectionUpgrade};

279
libp2p-swarm/src/swarm.rs Normal file
View File

@ -0,0 +1,279 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::io::Error as IoError;
use futures::{IntoFuture, Future, Stream, Async, Poll, future};
use futures::sync::mpsc;
use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};
/// Creates a swarm.
///
/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a
/// `Future` that produces a `()`.
///
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
/// control, and the `Future` must be driven to completion in order for things to work.
///
pub fn swarm<T, C, H, F>(transport: T, upgrade: C, handler: H)
-> (SwarmController<T, C>, SwarmFuture<T, C, H, F::Future>)
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
C::NamesIter: Clone, // TODO: not elegant
H: FnMut(C::Output, Multiaddr) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();
let upgraded = transport.clone().with_upgrade(upgrade);
let future = SwarmFuture {
upgraded: upgraded.clone(),
handler: handler,
new_listeners: new_listeners_rx,
next_incoming: upgraded.clone().next_incoming(),
listeners: Vec::new(),
listeners_upgrade: Vec::new(),
dialers: Vec::new(),
new_dialers: new_dialers_rx,
to_process: Vec::new(),
new_toprocess: new_toprocess_rx,
};
let controller = SwarmController {
transport: transport,
upgraded: upgraded,
new_listeners: new_listeners_tx,
new_dialers: new_dialers_tx,
new_toprocess: new_toprocess_tx,
};
(controller, future)
}
/// Allows control of what the swarm is doing.
pub struct SwarmController<T, C>
where T: MuxedTransport + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
{
transport: T,
upgraded: UpgradedNode<T, C>,
new_listeners: mpsc::UnboundedSender<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
new_dialers: mpsc::UnboundedSender<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
}
impl<T, C> SwarmController<T, C>
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
C::NamesIter: Clone, // TODO: not elegant
{
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`.
// TODO: consider returning a future so that errors can be processed?
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, upgrade: Du) -> Result<(), Multiaddr>
where Du: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
Du::Output: Into<C::Output>,
{
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr.clone()) {
Ok(dial) => {
let dial = Box::new(dial.map(Into::into)) as Box<Future<Item = _, Error = _>>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_dialers.unbounded_send((dial, multiaddr));
Ok(())
},
Err((_, multiaddr)) => {
Err(multiaddr)
},
}
}
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is then passed to `and_then`.
///
/// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that
/// was passed at initialization.
// TODO: consider returning a future so that errors can be processed?
pub fn dial_custom_handler<Du, Df, Dfu>(&self, multiaddr: Multiaddr, upgrade: Du, and_then: Df)
-> Result<(), Multiaddr>
where Du: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
Df: FnOnce(Du::Output) -> Dfu + 'static, // TODO: 'static :-/
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) {
Ok(dial) => {
let dial = Box::new(dial.and_then(and_then)) as Box<_>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_toprocess.unbounded_send(dial);
Ok(())
},
Err((_, multiaddr)) => {
Err(multiaddr)
},
}
}
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
/// was passed to `swarm`.
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.upgraded.clone().listen_on(multiaddr) {
Ok((listener, new_addr)) => {
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_listeners.unbounded_send(listener);
Ok(new_addr)
},
Err((_, multiaddr)) => {
Err(multiaddr)
},
}
}
}
/// Future that must be driven to completion in order for the swarm to work.
pub struct SwarmFuture<T, C, H, F>
where T: MuxedTransport + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
{
upgraded: UpgradedNode<T, C>,
handler: H,
new_listeners: mpsc::UnboundedReceiver<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
next_incoming: Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
listeners: Vec<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
listeners_upgrade: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
dialers: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
new_dialers: mpsc::UnboundedReceiver<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
to_process: Vec<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
}
impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
C::NamesIter: Clone, // TODO: not elegant
H: FnMut(C::Output, Multiaddr) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError>,
{
type Item = ();
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let handler = &mut self.handler;
match self.next_incoming.poll() {
Ok(Async::Ready((connec, client_addr))) => {
self.next_incoming = self.upgraded.clone().next_incoming();
self.to_process.push(future::Either::A(handler(connec, client_addr).into_future()));
},
Ok(Async::NotReady) => {},
// TODO: may not be the best idea because we're killing the whole server
Err(err) => return Err(err),
};
match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
self.listeners.push(new_listener);
},
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
},
Ok(Async::NotReady) => {},
};
match self.new_dialers.poll() {
Ok(Async::Ready(Some((new_dialer, multiaddr)))) => {
self.dialers.push((new_dialer, multiaddr));
},
Ok(Async::Ready(None)) | Err(_) => {
// New dialers sender has been closed.
},
Ok(Async::NotReady) => {},
};
match self.new_toprocess.poll() {
Ok(Async::Ready(Some(new_toprocess))) => {
self.to_process.push(future::Either::B(new_toprocess));
},
Ok(Async::Ready(None)) | Err(_) => {
// New to-process sender has been closed.
},
Ok(Async::NotReady) => {},
};
for n in (0 .. self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n);
match listener.poll() {
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
self.listeners.push(listener);
self.listeners_upgrade.push((upgrade, client_addr));
},
Ok(Async::NotReady) => {
self.listeners.push(listener);
},
Ok(Async::Ready(None)) => {},
Err(err) => return Err(err),
};
}
for n in (0 .. self.listeners_upgrade.len()).rev() {
let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n);
match upgrade.poll() {
Ok(Async::Ready(output)) => {
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
},
Ok(Async::NotReady) => {
self.listeners_upgrade.push((upgrade, addr));
},
Err(err) => return Err(err),
}
}
for n in (0 .. self.dialers.len()).rev() {
let (mut dialer, addr) = self.dialers.swap_remove(n);
match dialer.poll() {
Ok(Async::Ready(output)) => {
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
},
Ok(Async::NotReady) => {
self.dialers.push((dialer, addr));
},
Err(err) => return Err(err),
}
}
for n in (0 .. self.to_process.len()).rev() {
let mut to_process = self.to_process.swap_remove(n);
match to_process.poll() {
Ok(Async::Ready(())) => {},
Ok(Async::NotReady) => self.to_process.push(to_process),
Err(err) => return Err(err),
}
}
// TODO: we never return `Ok(Ready)` because there's no way to know whether
// `next_incoming()` can produce anything more in the future
Ok(Async::NotReady)
}
}

View File

@ -118,6 +118,18 @@ pub trait Transport {
upgrade: upgrade, upgrade: upgrade,
} }
} }
/// Builds a dummy implementation of `MuxedTransport` that uses this transport.
///
/// The resulting object will not actually use muxing. This means that dialing the same node
/// twice will result in two different connections instead of two substreams on the same
/// connection.
#[inline]
fn with_dummy_muxing(self) -> DummyMuxing<Self>
where Self: Sized
{
DummyMuxing { inner: self }
}
} }
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which /// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
@ -522,6 +534,29 @@ pub enum Endpoint {
Listener, Listener,
} }
/// Implementation of `ConnectionUpgrade` that always fails to negotiate.
#[derive(Debug, Copy, Clone)]
pub struct DeniedConnectionUpgrade;
impl<C> ConnectionUpgrade<C> for DeniedConnectionUpgrade
where C: AsyncRead + AsyncWrite
{
type NamesIter = iter::Empty<(Bytes, ())>;
type UpgradeIdentifier = (); // TODO: could use `!`
type Output = (); // TODO: could use `!`
type Future = Box<Future<Item = (), Error = IoError>>; // TODO: could use `!`
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::empty()
}
#[inline]
fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future {
unreachable!("the denied connection upgrade always fails to negotiate")
}
}
/// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything. /// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything.
pub trait UpgradeExt { pub trait UpgradeExt {
/// Builds a struct that will choose an upgrade between `self` and `other`, depending on what /// Builds a struct that will choose an upgrade between `self` and `other`, depending on what
@ -684,6 +719,53 @@ where
} }
} }
/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`.
#[derive(Debug, Copy, Clone)]
pub struct DummyMuxing<T> {
inner: T,
}
impl<T> MuxedTransport for DummyMuxing<T>
where T: Transport
{
type Incoming = future::Empty<(T::RawConn, Multiaddr), IoError>;
fn next_incoming(self) -> Self::Incoming
where Self: Sized
{
future::empty()
}
}
impl<T> Transport for DummyMuxing<T>
where T: Transport
{
type RawConn = T::RawConn;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized
{
self.inner.listen_on(addr).map_err(|(inner, addr)| {
(DummyMuxing { inner }, addr)
})
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized
{
self.inner.dial(addr).map_err(|(inner, addr)| {
(DummyMuxing { inner }, addr)
})
}
}
/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received /// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received
/// connection. /// connection.
/// ///
@ -752,7 +834,9 @@ where
/// This function returns the next incoming substream. You are strongly encouraged to call it /// This function returns the next incoming substream. You are strongly encouraged to call it
/// if you have a muxed transport. /// if you have a muxed transport.
pub fn next_incoming(self) -> Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a> pub fn next_incoming(self) -> Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>
where T: MuxedTransport where T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{ {
let upgrade = self.upgrade; let upgrade = self.upgrade;
@ -760,8 +844,8 @@ where
// Try to negotiate the protocol. // Try to negotiate the protocol.
.and_then(move |(connection, addr)| { .and_then(move |(connection, addr)| {
let iter = upgrade.protocol_names() let iter = upgrade.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id)); .map::<_, fn(_) -> _>(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
let negotiated = multistream_select::dialer_select_proto(connection, iter) let negotiated = multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err)); .map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
}) })

View File

@ -169,33 +169,32 @@ mod tests {
fn multiaddr_to_tcp_conversion() { fn multiaddr_to_tcp_conversion() {
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
assert!(multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap()).is_err()); assert!(multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap()).is_err());
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()), multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new( Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
12345, 12345,
)) ))
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()), multiaddr_to_socketaddr(&"/ip4/255.255.255.255/tcp/8080".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new( Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
8080, 8080,
)) ))
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()), multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
Ok(SocketAddr::new( Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
12345, 12345,
)) ))
); );
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&Multiaddr::new( multiaddr_to_socketaddr(&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080", .parse::<Multiaddr>().unwrap()),
).unwrap()),
Ok(SocketAddr::new( Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new( IpAddr::V6(Ipv6Addr::new(
65535, 65535,
@ -218,7 +217,7 @@ mod tests {
std::thread::spawn(move || { std::thread::spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
let tcp = TcpConfig::new(core.handle()); let tcp = TcpConfig::new(core.handle());
let handle = core.handle(); let handle = core.handle();
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| { let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
@ -239,7 +238,7 @@ mod tests {
core.run(listener).unwrap(); core.run(listener).unwrap();
}); });
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle()); let tcp = TcpConfig::new(core.handle());
// Obtain a future socket through dialing // Obtain a future socket through dialing
@ -262,7 +261,7 @@ mod tests {
let core = Core::new().unwrap(); let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle()); let tcp = TcpConfig::new(core.handle());
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/0").unwrap(); let addr = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
assert!(addr.to_string().contains("tcp/0")); assert!(addr.to_string().contains("tcp/0"));
let (_, new_addr) = tcp.listen_on(addr).unwrap(); let (_, new_addr) = tcp.listen_on(addr).unwrap();
@ -274,7 +273,7 @@ mod tests {
let core = Core::new().unwrap(); let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle()); let tcp = TcpConfig::new(core.handle());
let addr = Multiaddr::new("/ip6/::1/tcp/0").unwrap(); let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
assert!(addr.to_string().contains("tcp/0")); assert!(addr.to_string().contains("tcp/0"));
let (_, new_addr) = tcp.listen_on(addr).unwrap(); let (_, new_addr) = tcp.listen_on(addr).unwrap();
@ -286,7 +285,7 @@ mod tests {
let core = Core::new().unwrap(); let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle()); let tcp = TcpConfig::new(core.handle());
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345/tcp/12345").unwrap(); let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345".parse::<Multiaddr>().unwrap();
assert!(tcp.listen_on(addr).is_err()); assert!(tcp.listen_on(addr).is_err());
} }
} }

View File

@ -38,7 +38,7 @@ extern crate multiaddr;
use multiaddr::{Multiaddr, ToMultiaddr}; use multiaddr::{Multiaddr, ToMultiaddr};
let address = Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap(); let address = "/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap();
// or directly from a string // or directly from a string
let other = "/ip4/127.0.0.1".to_multiaddr().unwrap(); let other = "/ip4/127.0.0.1".to_multiaddr().unwrap();

View File

@ -38,7 +38,7 @@ impl fmt::Display for Multiaddr {
/// ``` /// ```
/// use multiaddr::Multiaddr; /// use multiaddr::Multiaddr;
/// ///
/// let address = Multiaddr::new("/ip4/127.0.0.1/udt").unwrap(); /// let address: Multiaddr = "/ip4/127.0.0.1/udt".parse().unwrap();
/// assert_eq!(address.to_string(), "/ip4/127.0.0.1/udt"); /// assert_eq!(address.to_string(), "/ip4/127.0.0.1/udt");
/// ``` /// ```
/// ///
@ -52,51 +52,6 @@ impl fmt::Display for Multiaddr {
} }
impl Multiaddr { impl Multiaddr {
/// Create a new multiaddr based on a string representation, like
/// `/ip4/127.0.0.1/udp/1234`.
///
/// # Examples
///
/// Simple construction
///
/// ```
/// use multiaddr::Multiaddr;
///
/// let address = Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap();
/// assert_eq!(address.to_bytes(), [
/// 4, 127, 0, 0, 1,
/// 17, 4, 210
/// ]);
/// ```
///
#[deprecated(note = "Use `string.parse()` instead")]
pub fn new(input: &str) -> Result<Multiaddr> {
let mut bytes = Vec::new();
let mut parts = input.split('/');
// A multiaddr must start with `/`
if !parts.next().ok_or(Error::InvalidMultiaddr)?.is_empty() {
return Err(Error::InvalidMultiaddr);
}
while let Some(part) = parts.next() {
let protocol: ProtocolId = part.parse()?;
let addr_component = match protocol.size() {
ProtocolArgSize::Fixed { bytes: 0 } => {
protocol.parse_data("")? // TODO: bad design
},
_ => {
let data = parts.next().ok_or(Error::MissingAddress)?;
protocol.parse_data(data)?
},
};
addr_component.write_bytes(&mut bytes).expect("writing to a Vec never fails");
}
Ok(Multiaddr { bytes: bytes })
}
/// Return a copy to disallow changing the bytes directly /// Return a copy to disallow changing the bytes directly
pub fn to_bytes(&self) -> Vec<u8> { pub fn to_bytes(&self) -> Vec<u8> {
self.bytes.to_owned() self.bytes.to_owned()
@ -116,7 +71,7 @@ impl Multiaddr {
/// ``` /// ```
/// use multiaddr::{Multiaddr, ProtocolId}; /// use multiaddr::{Multiaddr, ProtocolId};
/// ///
/// let address = Multiaddr::new("/ip4/127.0.0.1").unwrap(); /// let address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap();
/// assert_eq!(address.protocol(), vec![ProtocolId::IP4]); /// assert_eq!(address.protocol(), vec![ProtocolId::IP4]);
/// ``` /// ```
/// ///
@ -133,9 +88,9 @@ impl Multiaddr {
/// ``` /// ```
/// use multiaddr::Multiaddr; /// use multiaddr::Multiaddr;
/// ///
/// let address = Multiaddr::new("/ip4/127.0.0.1").unwrap(); /// let address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap();
/// let nested = address.encapsulate("/udt").unwrap(); /// let nested = address.encapsulate("/udt").unwrap();
/// assert_eq!(nested, Multiaddr::new("/ip4/127.0.0.1/udt").unwrap()); /// assert_eq!(nested, "/ip4/127.0.0.1/udt".parse().unwrap());
/// ``` /// ```
/// ///
pub fn encapsulate<T: ToMultiaddr>(&self, input: T) -> Result<Multiaddr> { pub fn encapsulate<T: ToMultiaddr>(&self, input: T) -> Result<Multiaddr> {
@ -154,9 +109,9 @@ impl Multiaddr {
/// ``` /// ```
/// use multiaddr::{Multiaddr, ToMultiaddr}; /// use multiaddr::{Multiaddr, ToMultiaddr};
/// ///
/// let address = Multiaddr::new("/ip4/127.0.0.1/udt/sctp/5678").unwrap(); /// let address: Multiaddr = "/ip4/127.0.0.1/udt/sctp/5678".parse().unwrap();
/// let unwrapped = address.decapsulate("/udt").unwrap(); /// let unwrapped = address.decapsulate("/udt").unwrap();
/// assert_eq!(unwrapped, Multiaddr::new("/ip4/127.0.0.1").unwrap()); /// assert_eq!(unwrapped, "/ip4/127.0.0.1".parse().unwrap());
/// ///
/// assert_eq!( /// assert_eq!(
/// address.decapsulate("/udt").unwrap(), /// address.decapsulate("/udt").unwrap(),
@ -281,8 +236,31 @@ impl FromStr for Multiaddr {
type Err = Error; type Err = Error;
#[inline] #[inline]
fn from_str(s: &str) -> Result<Self> { fn from_str(input: &str) -> Result<Self> {
Multiaddr::new(s) let mut bytes = Vec::new();
let mut parts = input.split('/');
// A multiaddr must start with `/`
if !parts.next().ok_or(Error::InvalidMultiaddr)?.is_empty() {
return Err(Error::InvalidMultiaddr);
}
while let Some(part) = parts.next() {
let protocol: ProtocolId = part.parse()?;
let addr_component = match protocol.size() {
ProtocolArgSize::Fixed { bytes: 0 } => {
protocol.parse_data("")? // TODO: bad design
},
_ => {
let data = parts.next().ok_or(Error::MissingAddress)?;
protocol.parse_data(data)?
},
};
addr_component.write_bytes(&mut bytes).expect("writing to a Vec never fails");
}
Ok(Multiaddr { bytes: bytes })
} }
} }
@ -337,14 +315,14 @@ impl ToMultiaddr for SocketAddr {
impl ToMultiaddr for SocketAddrV4 { impl ToMultiaddr for SocketAddrV4 {
fn to_multiaddr(&self) -> Result<Multiaddr> { fn to_multiaddr(&self) -> Result<Multiaddr> {
Multiaddr::new(&format!("/ip4/{}/tcp/{}", self.ip(), self.port())) format!("/ip4/{}/tcp/{}", self.ip(), self.port()).parse()
} }
} }
impl ToMultiaddr for SocketAddrV6 { impl ToMultiaddr for SocketAddrV6 {
fn to_multiaddr(&self) -> Result<Multiaddr> { fn to_multiaddr(&self) -> Result<Multiaddr> {
// TODO: Should how should we handle `flowinfo` and `scope_id`? // TODO: Should how should we handle `flowinfo` and `scope_id`?
Multiaddr::new(&format!("/ip6/{}/tcp/{}", self.ip(), self.port())) format!("/ip6/{}/tcp/{}", self.ip(), self.port()).parse()
} }
} }
@ -359,25 +337,25 @@ impl ToMultiaddr for IpAddr {
impl ToMultiaddr for Ipv4Addr { impl ToMultiaddr for Ipv4Addr {
fn to_multiaddr(&self) -> Result<Multiaddr> { fn to_multiaddr(&self) -> Result<Multiaddr> {
Multiaddr::new(&format!("/ip4/{}", &self)) format!("/ip4/{}", &self).parse()
} }
} }
impl ToMultiaddr for Ipv6Addr { impl ToMultiaddr for Ipv6Addr {
fn to_multiaddr(&self) -> Result<Multiaddr> { fn to_multiaddr(&self) -> Result<Multiaddr> {
Multiaddr::new(&format!("/ip6/{}", &self)) format!("/ip6/{}", &self).parse()
} }
} }
impl ToMultiaddr for String { impl ToMultiaddr for String {
fn to_multiaddr(&self) -> Result<Multiaddr> { fn to_multiaddr(&self) -> Result<Multiaddr> {
Multiaddr::new(self) self.parse()
} }
} }
impl<'a> ToMultiaddr for &'a str { impl<'a> ToMultiaddr for &'a str {
fn to_multiaddr(&self) -> Result<Multiaddr> { fn to_multiaddr(&self) -> Result<Multiaddr> {
Multiaddr::new(self) self.parse()
} }
} }

View File

@ -17,20 +17,20 @@ fn protocol_to_name() {
fn assert_bytes(source: &str, target: &str, protocols: Vec<ProtocolId>) -> () { fn assert_bytes(source: &str, target: &str, protocols: Vec<ProtocolId>) -> () {
let address = Multiaddr::new(source).unwrap(); let address = source.parse::<Multiaddr>().unwrap();
assert_eq!(hex::encode(address.to_bytes().as_slice()), target); assert_eq!(hex::encode(address.to_bytes().as_slice()), target);
assert_eq!(address.protocol(), protocols); assert_eq!(address.iter().map(|addr| addr.protocol_id()).collect::<Vec<_>>(), protocols);
} }
fn ma_valid(source: &str, target: &str, protocols: Vec<ProtocolId>) -> () { fn ma_valid(source: &str, target: &str, protocols: Vec<ProtocolId>) -> () {
assert_bytes(source, target, protocols); assert_bytes(source, target, protocols);
assert_eq!(Multiaddr::new(source).unwrap().to_string(), source); assert_eq!(source.parse::<Multiaddr>().unwrap().to_string(), source);
} }
#[test] #[test]
fn multiaddr_eq() { fn multiaddr_eq() {
let m1 = Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap(); let m1 = "/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap();
let m2 = Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap(); let m2 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().unwrap();
let m3 = Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap(); let m3 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().unwrap();
assert_ne!(m1, m2); assert_ne!(m1, m2);
assert_ne!(m2, m1); assert_ne!(m2, m1);
@ -135,7 +135,7 @@ fn construct_fail() {
"/p2p-circuit/50"]; "/p2p-circuit/50"];
for address in &addresses { for address in &addresses {
assert!(Multiaddr::new(address).is_err(), address.to_string()); assert!(address.parse::<Multiaddr>().is_err(), address.to_string());
} }
} }
@ -143,17 +143,17 @@ fn construct_fail() {
#[test] #[test]
fn to_multiaddr() { fn to_multiaddr() {
assert_eq!(Ipv4Addr::new(127, 0, 0, 1).to_multiaddr().unwrap(), assert_eq!(Ipv4Addr::new(127, 0, 0, 1).to_multiaddr().unwrap(),
Multiaddr::new("/ip4/127.0.0.1").unwrap()); "/ip4/127.0.0.1".parse::<Multiaddr>().unwrap());
assert_eq!(Ipv6Addr::new(0x2601, 0x9, 0x4f81, 0x9700, 0x803e, 0xca65, 0x66e8, 0xc21) assert_eq!(Ipv6Addr::new(0x2601, 0x9, 0x4f81, 0x9700, 0x803e, 0xca65, 0x66e8, 0xc21)
.to_multiaddr() .to_multiaddr()
.unwrap(), .unwrap(),
Multiaddr::new("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21").unwrap()); "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".parse::<Multiaddr>().unwrap());
assert_eq!("/ip4/127.0.0.1/tcp/1234".to_string().to_multiaddr().unwrap(), assert_eq!("/ip4/127.0.0.1/tcp/1234".to_string().to_multiaddr().unwrap(),
Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap()); "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().unwrap());
assert_eq!("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".to_multiaddr().unwrap(), assert_eq!("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".to_multiaddr().unwrap(),
Multiaddr::new("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21").unwrap()); "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".parse::<Multiaddr>().unwrap());
assert_eq!(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234).to_multiaddr().unwrap(), assert_eq!(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234).to_multiaddr().unwrap(),
Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap()); "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().unwrap());
assert_eq!(SocketAddrV6::new(Ipv6Addr::new(0x2601, assert_eq!(SocketAddrV6::new(Ipv6Addr::new(0x2601,
0x9, 0x9,
0x4f81, 0x4f81,
@ -167,5 +167,5 @@ fn to_multiaddr() {
0) 0)
.to_multiaddr() .to_multiaddr()
.unwrap(), .unwrap(),
Multiaddr::new("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/tcp/1234").unwrap()); "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/tcp/1234".parse::<Multiaddr>().unwrap());
} }

View File

@ -123,34 +123,48 @@ pub trait EncoderHelper: Sized {
/// Helper trait to allow multiple integer types to be encoded /// Helper trait to allow multiple integer types to be encoded
pub trait DecoderHelper: Sized { pub trait DecoderHelper: Sized {
/// Decode a single byte /// Decode a single byte
fn decode_one(decoder: &mut DecoderState<Self>, byte: u8) -> Option<Self>; fn decode_one(decoder: &mut DecoderState<Self>, byte: u8) -> errors::Result<Option<Self>>;
/// Read as much of the varint as possible /// Read as much of the varint as possible
fn read<R: AsyncRead>(decoder: &mut DecoderState<Self>, input: R) -> Poll<Option<Self>, Error>; fn read<R: AsyncRead>(decoder: &mut DecoderState<Self>, input: R) -> Poll<Option<Self>, Error>;
} }
macro_rules! impl_decoderstate { macro_rules! impl_decoderstate {
($t:ty) => { impl_decoderstate!($t, <$t>::from, |v| v); }; ($t:ty) => {
impl_decoderstate!(
$t,
|a| a as $t,
|a: $t, b| -> Option<$t> { a.checked_shl(b as u32) }
);
};
($t:ty, $make_fn:expr) => { impl_decoderstate!($t, $make_fn, $make_fn); }; ($t:ty, $make_fn:expr) => { impl_decoderstate!($t, $make_fn, $make_fn); };
($t:ty, $make_fn:expr, $make_shift_fn:expr) => { ($t:ty, $make_fn:expr, $shift_fn:expr) => {
impl DecoderHelper for $t { impl DecoderHelper for $t {
#[inline] #[inline]
fn decode_one(decoder: &mut DecoderState<Self>, byte: u8) -> Option<$t> { fn decode_one(decoder: &mut DecoderState<Self>, byte: u8) -> ::errors::Result<Option<$t>> {
decoder.accumulator.take().and_then(|accumulator| { let res = decoder.accumulator.take().and_then(|accumulator| {
let out = accumulator | let out = accumulator | match $shift_fn(
( $make_fn(byte & 0x7F),
$make_fn(byte & 0x7F) << decoder.shift * USABLE_BITS_PER_BYTE,
$make_shift_fn(decoder.shift * USABLE_BITS_PER_BYTE) ) {
); Some(a) => a,
None => return Some(Err(ErrorKind::ParseError.into())),
};
decoder.shift += 1; decoder.shift += 1;
if byte & 0x80 == 0 { if byte & 0x80 == 0 {
Some(out) Some(Ok(out))
} else { } else {
decoder.accumulator = AccumulatorState::InProgress(out); decoder.accumulator = AccumulatorState::InProgress(out);
None None
} }
}) });
match res {
Some(Ok(number)) => Ok(Some(number)),
Some(Err(err)) => Err(err),
None => Ok(None),
}
} }
fn read<R: AsyncRead>( fn read<R: AsyncRead>(
@ -173,7 +187,7 @@ macro_rules! impl_decoderstate {
match input.read_exact(&mut buffer) { match input.read_exact(&mut buffer) {
Ok(()) => { Ok(()) => {
if let Some(out) = Self::decode_one(decoder, buffer[0]) { if let Some(out) = Self::decode_one(decoder, buffer[0])? {
break Ok(Async::Ready(Some(out))); break Ok(Async::Ready(Some(out)));
} }
} }
@ -258,9 +272,9 @@ impl_encoderstate!(u64, (|val| val as u64));
impl_encoderstate!(u32, (|val| val as u32)); impl_encoderstate!(u32, (|val| val as u32));
impl_decoderstate!(usize); impl_decoderstate!(usize);
impl_decoderstate!(BigUint); impl_decoderstate!(BigUint, BigUint::from, |a, b| Some(a << b));
impl_decoderstate!(u64, (|val| val as u64)); impl_decoderstate!(u64);
impl_decoderstate!(u32, (|val| val as u32)); impl_decoderstate!(u32);
impl<T> EncoderState<T> { impl<T> EncoderState<T> {
pub fn source(&self) -> &T { pub fn source(&self) -> &T {
@ -368,7 +382,8 @@ impl<T: Default + DecoderHelper> Decoder for VarintDecoder<T> {
// We know that the length is not 0, so this cannot fail. // We know that the length is not 0, so this cannot fail.
let first_byte = src.split_to(1)[0]; let first_byte = src.split_to(1)[0];
let mut state = self.state.take().unwrap_or_default(); let mut state = self.state.take().unwrap_or_default();
let out = T::decode_one(&mut state, first_byte); let out = T::decode_one(&mut state, first_byte)
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
if let Some(out) = out { if let Some(out) = out {
break Ok(Some(out)); break Ok(Some(out));
@ -390,10 +405,12 @@ pub fn decode<R: Read, T: Default + DecoderHelper>(mut input: R) -> errors::Resu
match input.read_exact(&mut buffer) { match input.read_exact(&mut buffer) {
Ok(()) => { Ok(()) => {
if let Some(out) = T::decode_one(&mut decoder, buffer[0]) { if let Some(out) = T::decode_one(&mut decoder, buffer[0])
break Ok(out); .map_err(|_| io::Error::from(io::ErrorKind::Other))?
} {
} break Ok(out);
}
}
Err(inner) => break Err(Error::with_chain(inner, ErrorKind::ParseError)), Err(inner) => break Err(Error::with_chain(inner, ErrorKind::ParseError)),
} }
} }
@ -417,6 +434,34 @@ mod tests {
use num_bigint::BigUint; use num_bigint::BigUint;
use futures::{Future, Stream}; use futures::{Future, Stream};
#[test]
fn large_number_fails() {
use std::io::Cursor;
use futures::Async;
use super::WriteState;
let mut out = vec![0u8; 10];
{
let writable: Cursor<&mut [_]> = Cursor::new(&mut out);
let mut state = EncoderState::new(::std::u64::MAX);
assert_eq!(
state.write(writable).unwrap(),
Async::Ready(WriteState::Done(10))
);
}
let result: Result<Option<u32>, _> = FramedRead::new(&out[..], VarintDecoder::new())
.into_future()
.map(|(out, _)| out)
.map_err(|(out, _)| out)
.wait();
assert!(result.is_err());
}
#[test] #[test]
fn can_decode_basic_biguint() { fn can_decode_basic_biguint() {
assert_eq!( assert_eq!(