mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 23:31:33 +00:00
Rework libp2p-identify (#116)
* Add a proper PeerId to Peerstore * Turn identify into a transport layer * Expose the dialed multiaddress * Add identified nodes to the peerstore * Allow configuring the TTL of the addresses * Split identify in two modules * Some comments and tweaks * Run rustfmt * Add test and bugfix * Fix wrong address reported when dialing * Fix websocket browser code * Support the p2p protocol in libp2p-identify * Fix concerns * Fix libp2p-dns * More concerns
This commit is contained in:
@ -99,7 +99,7 @@ fn main() {
|
|||||||
// We now use the controller to dial to the address.
|
// We now use the controller to dial to the address.
|
||||||
let (finished_tx, finished_rx) = oneshot::channel();
|
let (finished_tx, finished_rx) = oneshot::channel();
|
||||||
swarm_controller
|
swarm_controller
|
||||||
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo| {
|
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo, _| {
|
||||||
// `echo` is what the closure used when initializing `proto` returns.
|
// `echo` is what the closure used when initializing `proto` returns.
|
||||||
// Consequently, please note that the `send` method is available only because the type
|
// Consequently, please note that the `send` method is available only because the type
|
||||||
// `length_delimited::Framed` has a `send` method.
|
// `length_delimited::Framed` has a `send` method.
|
||||||
|
@ -83,7 +83,7 @@ fn main() {
|
|||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
swarm_controller
|
swarm_controller
|
||||||
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), ping::Ping,
|
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), ping::Ping,
|
||||||
|(mut pinger, future)| {
|
|(mut pinger, future), _| {
|
||||||
let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| {
|
let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| {
|
||||||
println!("Received pong from the remote");
|
println!("Received pong from the remote");
|
||||||
let _ = tx.send(());
|
let _ = tx.send(());
|
||||||
|
@ -102,7 +102,7 @@ where
|
|||||||
type RawConn = T::RawConn;
|
type RawConn = T::RawConn;
|
||||||
type Listener = T::Listener;
|
type Listener = T::Listener;
|
||||||
type ListenerUpgrade = T::ListenerUpgrade;
|
type ListenerUpgrade = T::ListenerUpgrade;
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
@ -238,7 +238,7 @@ mod tests {
|
|||||||
type RawConn = <TcpConfig as Transport>::RawConn;
|
type RawConn = <TcpConfig as Transport>::RawConn;
|
||||||
type Listener = <TcpConfig as Transport>::Listener;
|
type Listener = <TcpConfig as Transport>::Listener;
|
||||||
type ListenerUpgrade = <TcpConfig as Transport>::ListenerUpgrade;
|
type ListenerUpgrade = <TcpConfig as Transport>::ListenerUpgrade;
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(
|
fn listen_on(
|
||||||
|
@ -23,200 +23,60 @@
|
|||||||
//!
|
//!
|
||||||
//! When two nodes connect to each other, the listening half sends a message to the dialing half,
|
//! When two nodes connect to each other, the listening half sends a message to the dialing half,
|
||||||
//! indicating the information, and then the protocol stops.
|
//! indicating the information, and then the protocol stops.
|
||||||
|
//!
|
||||||
|
//! # Usage
|
||||||
|
//!
|
||||||
|
//! Both low-level and high-level usages are available.
|
||||||
|
//!
|
||||||
|
//! ## High-level usage through the `IdentifyTransport` struct
|
||||||
|
//!
|
||||||
|
//! This crate provides the `IdentifyTransport` struct, which wraps around a `Transport` and an
|
||||||
|
//! implementation of `Peerstore`. `IdentifyTransport` is itself a transport that accepts
|
||||||
|
//! multiaddresses of the form `/p2p/...` or `/ipfs/...`.
|
||||||
|
//!
|
||||||
|
//! > **Note**: All the documentation refers to `/p2p/...`, however `/ipfs/...` is also supported.
|
||||||
|
//!
|
||||||
|
//! If you dial a multiaddr of the form `/p2p/...`, then the `IdentifyTransport` will look into
|
||||||
|
//! the `Peerstore` for any known multiaddress for this peer and try to dial them using the
|
||||||
|
//! underlying transport. If you dial any other multiaddr, then it will dial this multiaddr using
|
||||||
|
//! the underlying transport, then negotiate the *identify* protocol with the remote in order to
|
||||||
|
//! obtain its ID, then add it to the peerstore, and finally dial the same multiaddr again and
|
||||||
|
//! return the connection.
|
||||||
|
//!
|
||||||
|
//! Listening doesn't support multiaddresses of the form `/p2p/...` (because that wouldn't make
|
||||||
|
//! sense). Any address passed to `listen_on` will be passed directly to the underlying transport.
|
||||||
|
//!
|
||||||
|
//! Whenever a remote connects to us, either through listening or through `next_incoming`, the
|
||||||
|
//! `IdentifyTransport` dials back the remote, upgrades the connection to the *identify* protocol
|
||||||
|
//! in order to obtain the ID of the remote, stores the information in the peerstore, and finally
|
||||||
|
//! only returns the connection. From the exterior, the multiaddress of the remote is of the form
|
||||||
|
//! `/p2p/...`. If the remote doesn't support the *identify* protocol, then the socket is closed.
|
||||||
|
//!
|
||||||
|
//! Because of the behaviour of `IdentifyProtocol`, it is recommended to build it on top of a
|
||||||
|
//! `ConnectionReuse`.
|
||||||
|
//!
|
||||||
|
//! ## Low-level usage through the `IdentifyProtocolConfig` struct
|
||||||
|
//!
|
||||||
|
//! The `IdentifyProtocolConfig` struct implements the `ConnectionUpgrade` trait. Using it will
|
||||||
|
//! negotiate the *identify* protocol.
|
||||||
|
//!
|
||||||
|
//! The output of the upgrade is a `IdentifyOutput`. If we are the dialer, then `IdentifyOutput`
|
||||||
|
//! will contain the information sent by the remote. If we are the listener, then it will contain
|
||||||
|
//! a `IdentifySender` struct that can be used to transmit back to the remote the information about
|
||||||
|
//! it.
|
||||||
|
|
||||||
extern crate bytes;
|
extern crate bytes;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate multiaddr;
|
|
||||||
extern crate libp2p_peerstore;
|
extern crate libp2p_peerstore;
|
||||||
extern crate libp2p_swarm;
|
extern crate libp2p_swarm;
|
||||||
|
extern crate multiaddr;
|
||||||
extern crate protobuf;
|
extern crate protobuf;
|
||||||
extern crate tokio_io;
|
extern crate tokio_io;
|
||||||
extern crate varint;
|
extern crate varint;
|
||||||
|
|
||||||
use bytes::{Bytes, BytesMut};
|
pub use self::protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig, IdentifySender};
|
||||||
use futures::{Future, Stream, Sink};
|
pub use self::transport::IdentifyTransport;
|
||||||
use libp2p_swarm::{ConnectionUpgrade, Endpoint};
|
|
||||||
use multiaddr::Multiaddr;
|
|
||||||
use protobuf::Message as ProtobufMessage;
|
|
||||||
use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes;
|
|
||||||
use protobuf::repeated::RepeatedField;
|
|
||||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
|
||||||
use std::iter;
|
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
|
||||||
use varint::VarintCodec;
|
|
||||||
|
|
||||||
|
mod protocol;
|
||||||
mod structs_proto;
|
mod structs_proto;
|
||||||
|
mod transport;
|
||||||
/// Prototype for an upgrade to the identity protocol.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct IdentifyProtocol {
|
|
||||||
/// Our public key to report to the remote.
|
|
||||||
pub public_key: Vec<u8>,
|
|
||||||
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
|
|
||||||
pub protocol_version: String,
|
|
||||||
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
|
|
||||||
/// of HTTP.
|
|
||||||
pub agent_version: String,
|
|
||||||
/// Addresses that we are listening on.
|
|
||||||
pub listen_addrs: Vec<Multiaddr>,
|
|
||||||
/// Protocols supported by us.
|
|
||||||
pub protocols: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Information sent from the listener to the dialer.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct IdentifyInfo {
|
|
||||||
/// Public key of the node.
|
|
||||||
pub public_key: Vec<u8>,
|
|
||||||
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
|
|
||||||
pub protocol_version: String,
|
|
||||||
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
|
|
||||||
/// of HTTP.
|
|
||||||
pub agent_version: String,
|
|
||||||
/// Addresses that the remote is listening on.
|
|
||||||
pub listen_addrs: Vec<Multiaddr>,
|
|
||||||
/// Our own address as reported by the remote.
|
|
||||||
pub observed_addr: Multiaddr,
|
|
||||||
/// Protocols supported by the remote.
|
|
||||||
pub protocols: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> ConnectionUpgrade<C> for IdentifyProtocol
|
|
||||||
where C: AsyncRead + AsyncWrite + 'static
|
|
||||||
{
|
|
||||||
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
|
|
||||||
type UpgradeIdentifier = ();
|
|
||||||
type Output = Option<IdentifyInfo>;
|
|
||||||
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn protocol_names(&self) -> Self::NamesIter {
|
|
||||||
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future {
|
|
||||||
let socket = socket.framed(VarintCodec::default());
|
|
||||||
|
|
||||||
match ty {
|
|
||||||
Endpoint::Dialer => {
|
|
||||||
let future = socket.into_future()
|
|
||||||
.map(|(msg, _)| msg)
|
|
||||||
.map_err(|(err, _)| err)
|
|
||||||
.and_then(|msg| if let Some(msg) = msg {
|
|
||||||
Ok(Some(parse_proto_msg(msg)?))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(future) as Box<_>
|
|
||||||
}
|
|
||||||
|
|
||||||
Endpoint::Listener => {
|
|
||||||
let listen_addrs = self.listen_addrs
|
|
||||||
.into_iter()
|
|
||||||
.map(|addr| addr.to_string().into_bytes())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut message = structs_proto::Identify::new();
|
|
||||||
message.set_agentVersion(self.agent_version);
|
|
||||||
message.set_protocolVersion(self.protocol_version);
|
|
||||||
message.set_publicKey(self.public_key);
|
|
||||||
message.set_listenAddrs(listen_addrs);
|
|
||||||
message.set_observedAddr(remote_addr.to_string().into_bytes());
|
|
||||||
message.set_protocols(RepeatedField::from_vec(self.protocols));
|
|
||||||
|
|
||||||
let bytes = message.write_to_bytes()
|
|
||||||
.expect("writing protobuf failed ; should never happen");
|
|
||||||
|
|
||||||
// On the server side, after sending the information to the client we make the
|
|
||||||
// future produce a `None`. If we were on the client side, this would contain the
|
|
||||||
// information received by the server.
|
|
||||||
let future = socket.send(bytes).map(|_| None);
|
|
||||||
Box::new(future) as Box<_>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Turns a protobuf message into an `IdentifyInfo`. If something bad happens, turn it into
|
|
||||||
// an `IoError`.
|
|
||||||
fn parse_proto_msg(msg: BytesMut) -> Result<IdentifyInfo, IoError> {
|
|
||||||
match protobuf_parse_from_bytes::<structs_proto::Identify>(&msg) {
|
|
||||||
Ok(mut msg) => {
|
|
||||||
let listen_addrs = {
|
|
||||||
let mut addrs = Vec::new();
|
|
||||||
for addr in msg.take_listenAddrs().into_iter() {
|
|
||||||
addrs.push(bytes_to_multiaddr(addr)?);
|
|
||||||
}
|
|
||||||
addrs
|
|
||||||
};
|
|
||||||
|
|
||||||
let observed_addr = bytes_to_multiaddr(msg.take_observedAddr())?;
|
|
||||||
|
|
||||||
Ok(IdentifyInfo {
|
|
||||||
public_key: msg.take_publicKey(),
|
|
||||||
protocol_version: msg.take_protocolVersion(),
|
|
||||||
agent_version: msg.take_agentVersion(),
|
|
||||||
listen_addrs: listen_addrs,
|
|
||||||
observed_addr: observed_addr,
|
|
||||||
protocols: msg.take_protocols().into_vec(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(err) => {
|
|
||||||
Err(IoError::new(IoErrorKind::InvalidData, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into an `IoError`.
|
|
||||||
fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, IoError> {
|
|
||||||
String::from_utf8(bytes)
|
|
||||||
.map_err(|err| {
|
|
||||||
IoError::new(IoErrorKind::InvalidData, err)
|
|
||||||
})
|
|
||||||
.and_then(|s| {
|
|
||||||
s.parse()
|
|
||||||
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
extern crate libp2p_tcp_transport;
|
|
||||||
extern crate tokio_core;
|
|
||||||
|
|
||||||
use self::libp2p_tcp_transport::TcpConfig;
|
|
||||||
use self::tokio_core::reactor::Core;
|
|
||||||
use IdentifyProtocol;
|
|
||||||
use futures::{IntoFuture, Future, Stream};
|
|
||||||
use libp2p_swarm::Transport;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn basic() {
|
|
||||||
let mut core = Core::new().unwrap();
|
|
||||||
let tcp = TcpConfig::new(core.handle());
|
|
||||||
let with_proto = tcp.with_upgrade(IdentifyProtocol {
|
|
||||||
public_key: vec![1, 2, 3, 4],
|
|
||||||
protocol_version: "ipfs/1.0.0".to_owned(),
|
|
||||||
agent_version: "agent/version".to_owned(),
|
|
||||||
listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()],
|
|
||||||
protocols: vec!["ping".to_owned(), "kad".to_owned()],
|
|
||||||
});
|
|
||||||
|
|
||||||
let (server, addr) = with_proto.clone()
|
|
||||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
|
||||||
.unwrap();
|
|
||||||
let server = server.into_future()
|
|
||||||
.map_err(|(err, _)| err)
|
|
||||||
.and_then(|(n, _)| n.unwrap().0);
|
|
||||||
let dialer = with_proto.dial(addr)
|
|
||||||
.unwrap()
|
|
||||||
.into_future();
|
|
||||||
|
|
||||||
let (recv, should_be_empty) = core.run(dialer.join(server)).unwrap();
|
|
||||||
assert!(should_be_empty.is_none());
|
|
||||||
let recv = recv.unwrap();
|
|
||||||
assert_eq!(recv.public_key, &[1, 2, 3, 4]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
283
libp2p-identify/src/protocol.rs
Normal file
283
libp2p-identify/src/protocol.rs
Normal file
@ -0,0 +1,283 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use bytes::{Bytes, BytesMut};
|
||||||
|
use futures::{future, Future, Sink, Stream};
|
||||||
|
use libp2p_swarm::{ConnectionUpgrade, Endpoint};
|
||||||
|
use multiaddr::Multiaddr;
|
||||||
|
use protobuf::Message as ProtobufMessage;
|
||||||
|
use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes;
|
||||||
|
use protobuf::repeated::RepeatedField;
|
||||||
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
|
use std::iter;
|
||||||
|
use structs_proto;
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_io::codec::Framed;
|
||||||
|
use varint::VarintCodec;
|
||||||
|
|
||||||
|
/// Configuration for an upgrade to the identity protocol.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct IdentifyProtocolConfig;
|
||||||
|
|
||||||
|
/// Output of the connection upgrade.
|
||||||
|
pub enum IdentifyOutput<T> {
|
||||||
|
/// We obtained information from the remote. Happens when we are the dialer.
|
||||||
|
RemoteInfo {
|
||||||
|
info: IdentifyInfo,
|
||||||
|
/// Address the remote sees for us.
|
||||||
|
observed_addr: Multiaddr,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// We opened a connection to the remote and need to send it information. Happens when we are
|
||||||
|
/// the listener.
|
||||||
|
Sender {
|
||||||
|
/// Object used to send identify info to the client.
|
||||||
|
sender: IdentifySender<T>,
|
||||||
|
/// Observed multiaddress of the client.
|
||||||
|
observed_addr: Multiaddr,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Object used to send back information to the client.
|
||||||
|
pub struct IdentifySender<T> {
|
||||||
|
inner: Framed<T, VarintCodec<Vec<u8>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> IdentifySender<T>
|
||||||
|
where
|
||||||
|
T: AsyncWrite + 'a,
|
||||||
|
{
|
||||||
|
/// Sends back information to the client. Returns a future that is signalled whenever the
|
||||||
|
/// info have been sent.
|
||||||
|
pub fn send(
|
||||||
|
self,
|
||||||
|
info: IdentifyInfo,
|
||||||
|
observed_addr: &Multiaddr,
|
||||||
|
) -> Box<Future<Item = (), Error = IoError> + 'a> {
|
||||||
|
let listen_addrs = info.listen_addrs
|
||||||
|
.into_iter()
|
||||||
|
.map(|addr| addr.into_bytes())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut message = structs_proto::Identify::new();
|
||||||
|
message.set_agentVersion(info.agent_version);
|
||||||
|
message.set_protocolVersion(info.protocol_version);
|
||||||
|
message.set_publicKey(info.public_key);
|
||||||
|
message.set_listenAddrs(listen_addrs);
|
||||||
|
message.set_observedAddr(observed_addr.to_bytes());
|
||||||
|
message.set_protocols(RepeatedField::from_vec(info.protocols));
|
||||||
|
|
||||||
|
let bytes = message
|
||||||
|
.write_to_bytes()
|
||||||
|
.expect("writing protobuf failed ; should never happen");
|
||||||
|
|
||||||
|
let future = self.inner.send(bytes).map(|_| ());
|
||||||
|
Box::new(future) as Box<_>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Information sent from the listener to the dialer.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct IdentifyInfo {
|
||||||
|
/// Public key of the node in the DER format.
|
||||||
|
pub public_key: Vec<u8>,
|
||||||
|
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
|
||||||
|
pub protocol_version: String,
|
||||||
|
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
|
||||||
|
/// of HTTP.
|
||||||
|
pub agent_version: String,
|
||||||
|
/// Addresses that the node is listening on.
|
||||||
|
pub listen_addrs: Vec<Multiaddr>,
|
||||||
|
/// Protocols supported by the node, eg. `/ipfs/ping/1.0.0`.
|
||||||
|
pub protocols: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C> ConnectionUpgrade<C> for IdentifyProtocolConfig
|
||||||
|
where
|
||||||
|
C: AsyncRead + AsyncWrite + 'static,
|
||||||
|
{
|
||||||
|
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
|
||||||
|
type UpgradeIdentifier = ();
|
||||||
|
type Output = IdentifyOutput<C>;
|
||||||
|
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn protocol_names(&self) -> Self::NamesIter {
|
||||||
|
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn upgrade(self, socket: C, _: (), ty: Endpoint, observed_addr: &Multiaddr) -> Self::Future {
|
||||||
|
let socket = socket.framed(VarintCodec::default());
|
||||||
|
|
||||||
|
match ty {
|
||||||
|
Endpoint::Dialer => {
|
||||||
|
let future = socket
|
||||||
|
.into_future()
|
||||||
|
.map(|(msg, _)| msg)
|
||||||
|
.map_err(|(err, _)| err)
|
||||||
|
.and_then(|msg| {
|
||||||
|
if let Some(msg) = msg {
|
||||||
|
let (info, observed_addr) = parse_proto_msg(msg)?;
|
||||||
|
Ok(IdentifyOutput::RemoteInfo {
|
||||||
|
info,
|
||||||
|
observed_addr,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(IoErrorKind::InvalidData.into())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(future) as Box<_>
|
||||||
|
}
|
||||||
|
|
||||||
|
Endpoint::Listener => {
|
||||||
|
let sender = IdentifySender { inner: socket };
|
||||||
|
|
||||||
|
let future = future::ok(IdentifyOutput::Sender {
|
||||||
|
sender,
|
||||||
|
observed_addr: observed_addr.clone(),
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(future) as Box<_>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Turns a protobuf message into an `IdentifyInfo` and an observed address. If something bad
|
||||||
|
// happens, turn it into an `IoError`.
|
||||||
|
fn parse_proto_msg(msg: BytesMut) -> Result<(IdentifyInfo, Multiaddr), IoError> {
|
||||||
|
match protobuf_parse_from_bytes::<structs_proto::Identify>(&msg) {
|
||||||
|
Ok(mut msg) => {
|
||||||
|
// Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into
|
||||||
|
// an `IoError`.
|
||||||
|
fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, IoError> {
|
||||||
|
Multiaddr::from_bytes(bytes)
|
||||||
|
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
|
||||||
|
}
|
||||||
|
|
||||||
|
let listen_addrs = {
|
||||||
|
let mut addrs = Vec::new();
|
||||||
|
for addr in msg.take_listenAddrs().into_iter() {
|
||||||
|
addrs.push(bytes_to_multiaddr(addr)?);
|
||||||
|
}
|
||||||
|
addrs
|
||||||
|
};
|
||||||
|
|
||||||
|
let observed_addr = bytes_to_multiaddr(msg.take_observedAddr())?;
|
||||||
|
|
||||||
|
let info = IdentifyInfo {
|
||||||
|
public_key: msg.take_publicKey(),
|
||||||
|
protocol_version: msg.take_protocolVersion(),
|
||||||
|
agent_version: msg.take_agentVersion(),
|
||||||
|
listen_addrs: listen_addrs,
|
||||||
|
protocols: msg.take_protocols().into_vec(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((info, observed_addr))
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => Err(IoError::new(IoErrorKind::InvalidData, err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
extern crate libp2p_tcp_transport;
|
||||||
|
extern crate tokio_core;
|
||||||
|
|
||||||
|
use self::libp2p_tcp_transport::TcpConfig;
|
||||||
|
use self::tokio_core::reactor::Core;
|
||||||
|
use {IdentifyProtocolConfig, IdentifyOutput, IdentifyInfo};
|
||||||
|
use futures::{Future, Stream};
|
||||||
|
use libp2p_swarm::Transport;
|
||||||
|
use std::sync::mpsc;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn correct_transfer() {
|
||||||
|
// We open a server and a client, send info from the server to the client, and check that
|
||||||
|
// they were successfully received.
|
||||||
|
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let bg_thread = thread::spawn(move || {
|
||||||
|
let mut core = Core::new().unwrap();
|
||||||
|
let transport = TcpConfig::new(core.handle())
|
||||||
|
.with_upgrade(IdentifyProtocolConfig);
|
||||||
|
|
||||||
|
let (listener, addr) = transport
|
||||||
|
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
|
||||||
|
tx.send(addr).unwrap();
|
||||||
|
|
||||||
|
let future = listener
|
||||||
|
.into_future()
|
||||||
|
.map_err(|(err, _)| err)
|
||||||
|
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
|
||||||
|
.and_then(|identify| {
|
||||||
|
match identify {
|
||||||
|
IdentifyOutput::Sender { sender, .. } => {
|
||||||
|
sender.send(IdentifyInfo {
|
||||||
|
public_key: vec![1, 2, 3, 4, 5, 7],
|
||||||
|
protocol_version: "proto_version".to_owned(),
|
||||||
|
agent_version: "agent_version".to_owned(),
|
||||||
|
listen_addrs: vec![
|
||||||
|
"/ip4/80.81.82.83/tcp/500".parse().unwrap(),
|
||||||
|
"/ip6/::1/udp/1000".parse().unwrap()
|
||||||
|
],
|
||||||
|
protocols: vec!["proto1".to_string(), "proto2".to_string()],
|
||||||
|
}, &"/ip4/100.101.102.103/tcp/5000".parse().unwrap())
|
||||||
|
},
|
||||||
|
_ => panic!()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = core.run(future).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut core = Core::new().unwrap();
|
||||||
|
let transport = TcpConfig::new(core.handle())
|
||||||
|
.with_upgrade(IdentifyProtocolConfig);
|
||||||
|
|
||||||
|
let future = transport
|
||||||
|
.dial(rx.recv().unwrap())
|
||||||
|
.unwrap_or_else(|_| panic!())
|
||||||
|
.and_then(|(identify, _)| {
|
||||||
|
match identify {
|
||||||
|
IdentifyOutput::RemoteInfo { info, observed_addr } => {
|
||||||
|
assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
|
||||||
|
assert_eq!(info.public_key, &[1, 2, 3, 4, 5, 7]);
|
||||||
|
assert_eq!(info.protocol_version, "proto_version");
|
||||||
|
assert_eq!(info.agent_version, "agent_version");
|
||||||
|
assert_eq!(info.listen_addrs, &[
|
||||||
|
"/ip4/80.81.82.83/tcp/500".parse().unwrap(),
|
||||||
|
"/ip6/::1/udp/1000".parse().unwrap()
|
||||||
|
]);
|
||||||
|
assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
_ => panic!()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let _ = core.run(future).unwrap();
|
||||||
|
bg_thread.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
384
libp2p-identify/src/transport.rs
Normal file
384
libp2p-identify/src/transport.rs
Normal file
@ -0,0 +1,384 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use futures::{stream, Future, IntoFuture, Stream};
|
||||||
|
use libp2p_peerstore::{PeerAccess, PeerId, Peerstore};
|
||||||
|
use libp2p_swarm::{MuxedTransport, Transport};
|
||||||
|
use multiaddr::{AddrComponent, Multiaddr};
|
||||||
|
use protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig};
|
||||||
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// Implementation of `Transport`. See [the crate root description](index.html).
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct IdentifyTransport<Trans, PStoreRef> {
|
||||||
|
transport: Trans,
|
||||||
|
peerstore: PStoreRef,
|
||||||
|
addr_ttl: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Trans, PStoreRef> IdentifyTransport<Trans, PStoreRef> {
|
||||||
|
/// Creates an `IdentifyTransport` that wraps around the given transport and peerstore.
|
||||||
|
#[inline]
|
||||||
|
pub fn new(transport: Trans, peerstore: PStoreRef) -> Self {
|
||||||
|
IdentifyTransport::with_ttl(transport, peerstore, Duration::from_secs(3600))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as `new`, but allows specifying a time-to-live for the addresses gathered from
|
||||||
|
/// remotes that connect to us.
|
||||||
|
///
|
||||||
|
/// The default value is one hour.
|
||||||
|
#[inline]
|
||||||
|
pub fn with_ttl(transport: Trans, peerstore: PStoreRef, ttl: Duration) -> Self {
|
||||||
|
IdentifyTransport {
|
||||||
|
transport: transport,
|
||||||
|
peerstore: peerstore,
|
||||||
|
addr_ttl: ttl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Trans, PStore, PStoreRef> Transport for IdentifyTransport<Trans, PStoreRef>
|
||||||
|
where
|
||||||
|
Trans: Transport + Clone + 'static, // TODO: 'static :(
|
||||||
|
PStoreRef: Deref<Target = PStore> + Clone + 'static, // TODO: 'static :(
|
||||||
|
for<'r> &'r PStore: Peerstore,
|
||||||
|
{
|
||||||
|
type RawConn = Trans::RawConn;
|
||||||
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
|
type ListenerUpgrade = Box<Future<Item = (Trans::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
type Dial = Box<Future<Item = (Trans::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
|
// Note that `listen_on` expects a "regular" multiaddr (eg. `/ip/.../tcp/...`),
|
||||||
|
// and not `/p2p/<foo>`.
|
||||||
|
|
||||||
|
let (listener, new_addr) = match self.transport.clone().listen_on(addr.clone()) {
|
||||||
|
Ok((l, a)) => (l, a),
|
||||||
|
Err((inner, addr)) => {
|
||||||
|
let id = IdentifyTransport {
|
||||||
|
transport: inner,
|
||||||
|
peerstore: self.peerstore,
|
||||||
|
addr_ttl: self.addr_ttl,
|
||||||
|
};
|
||||||
|
return Err((id, addr));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let identify_upgrade = self.transport.with_upgrade(IdentifyProtocolConfig);
|
||||||
|
let peerstore = self.peerstore;
|
||||||
|
let addr_ttl = self.addr_ttl;
|
||||||
|
|
||||||
|
let listener = listener.map(move |connec| {
|
||||||
|
let peerstore = peerstore.clone();
|
||||||
|
let identify_upgrade = identify_upgrade.clone();
|
||||||
|
let fut = connec
|
||||||
|
.and_then(move |(connec, client_addr)| {
|
||||||
|
// Dial the address that connected to us and try upgrade with the
|
||||||
|
// identify protocol.
|
||||||
|
identify_upgrade
|
||||||
|
.clone()
|
||||||
|
.dial(client_addr.clone())
|
||||||
|
.map_err(|_| {
|
||||||
|
IoError::new(IoErrorKind::Other, "couldn't dial back incoming node")
|
||||||
|
})
|
||||||
|
.map(move |id| (id, connec))
|
||||||
|
})
|
||||||
|
.and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec)))
|
||||||
|
.and_then(move |((identify, original_addr), connec)| {
|
||||||
|
// Compute the "real" address of the node (in the form `/p2p/...`) and add
|
||||||
|
// it to the peerstore.
|
||||||
|
let real_addr = match identify {
|
||||||
|
IdentifyOutput::RemoteInfo { info, .. } => process_identify_info(
|
||||||
|
&info,
|
||||||
|
&*peerstore.clone(),
|
||||||
|
original_addr,
|
||||||
|
addr_ttl,
|
||||||
|
)?,
|
||||||
|
_ => unreachable!(
|
||||||
|
"the identify protocol guarantees that we receive \
|
||||||
|
remote information when we dial a node"
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((connec, real_addr))
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(fut) as Box<Future<Item = _, Error = _>>
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok((Box::new(listener) as Box<_>, new_addr))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
|
match multiaddr_to_peerid(addr.clone()) {
|
||||||
|
Ok(peer_id) => {
|
||||||
|
// If the multiaddress is a peer ID, try each known multiaddress (taken from the
|
||||||
|
// peerstore) one by one.
|
||||||
|
let addrs = self.peerstore
|
||||||
|
.deref()
|
||||||
|
.peer(&peer_id)
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|peer| peer.addrs())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.into_iter();
|
||||||
|
|
||||||
|
let transport = self.transport;
|
||||||
|
let future = stream::iter_ok(addrs)
|
||||||
|
// Try to dial each address through the transport.
|
||||||
|
.filter_map(move |addr| transport.clone().dial(addr).ok())
|
||||||
|
.and_then(move |dial| dial)
|
||||||
|
// Pick the first non-failing dial result.
|
||||||
|
.then(|res| Ok(res))
|
||||||
|
.filter_map(|res| res.ok())
|
||||||
|
.into_future()
|
||||||
|
.map_err(|(err, _)| err)
|
||||||
|
.and_then(|(val, _)| val.ok_or(IoErrorKind::InvalidData.into())) // TODO: wrong error
|
||||||
|
.map(move |(socket, _inner_client_addr)| (socket, addr));
|
||||||
|
|
||||||
|
Ok(Box::new(future) as Box<_>)
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(addr) => {
|
||||||
|
// If the multiaddress is something else, propagate it to the underlying transport
|
||||||
|
// and identify the node.
|
||||||
|
let transport = self.transport;
|
||||||
|
let identify_upgrade = transport.clone().with_upgrade(IdentifyProtocolConfig);
|
||||||
|
|
||||||
|
// We dial a first time the node and upgrade it to identify.
|
||||||
|
let dial = match identify_upgrade.dial(addr) {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err((_, addr)) => {
|
||||||
|
let id = IdentifyTransport {
|
||||||
|
transport,
|
||||||
|
peerstore: self.peerstore,
|
||||||
|
addr_ttl: self.addr_ttl,
|
||||||
|
};
|
||||||
|
return Err((id, addr));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let peerstore = self.peerstore;
|
||||||
|
let addr_ttl = self.addr_ttl;
|
||||||
|
|
||||||
|
let future = dial.and_then(move |identify| {
|
||||||
|
// On success, store the information in the peerstore and compute the
|
||||||
|
// "real" address of the node (of the form `/p2p/...`).
|
||||||
|
let (real_addr, old_addr);
|
||||||
|
match identify {
|
||||||
|
(IdentifyOutput::RemoteInfo { info, .. }, a) => {
|
||||||
|
old_addr = a.clone();
|
||||||
|
real_addr = process_identify_info(&info, &*peerstore, a, addr_ttl)?;
|
||||||
|
}
|
||||||
|
_ => unreachable!(
|
||||||
|
"the identify protocol guarantees that we receive \
|
||||||
|
remote information when we dial a node"
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Then dial the same node again.
|
||||||
|
Ok(transport
|
||||||
|
.dial(old_addr)
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
panic!("the same multiaddr was determined to be valid earlier")
|
||||||
|
})
|
||||||
|
.into_future()
|
||||||
|
.map(move |(dial, _wrong_addr)| (dial, real_addr)))
|
||||||
|
}).flatten();
|
||||||
|
|
||||||
|
Ok(Box::new(future) as Box<_>)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
|
||||||
|
self.transport.nat_traversal(a, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Trans, PStore, PStoreRef> MuxedTransport for IdentifyTransport<Trans, PStoreRef>
|
||||||
|
where
|
||||||
|
Trans: MuxedTransport + Clone + 'static,
|
||||||
|
PStoreRef: Deref<Target = PStore> + Clone + 'static,
|
||||||
|
for<'r> &'r PStore: Peerstore,
|
||||||
|
{
|
||||||
|
type Incoming = Box<Future<Item = (Trans::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn next_incoming(self) -> Self::Incoming {
|
||||||
|
let identify_upgrade = self.transport.clone().with_upgrade(IdentifyProtocolConfig);
|
||||||
|
let peerstore = self.peerstore;
|
||||||
|
let addr_ttl = self.addr_ttl;
|
||||||
|
|
||||||
|
let future = self.transport
|
||||||
|
.next_incoming()
|
||||||
|
.and_then(move |(connec, client_addr)| {
|
||||||
|
// On an incoming connection, dial back the node and upgrade to the identify
|
||||||
|
// protocol.
|
||||||
|
identify_upgrade
|
||||||
|
.clone()
|
||||||
|
.dial(client_addr.clone())
|
||||||
|
.map_err(|_| {
|
||||||
|
IoError::new(IoErrorKind::Other, "couldn't dial back incoming node")
|
||||||
|
})
|
||||||
|
.map(move |id| (id, connec))
|
||||||
|
})
|
||||||
|
.and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec)))
|
||||||
|
.and_then(move |(identify, connec)| {
|
||||||
|
// Add the info to the peerstore and compute the "real" address of the node (in
|
||||||
|
// the form `/p2p/...`).
|
||||||
|
let real_addr = match identify {
|
||||||
|
(IdentifyOutput::RemoteInfo { info, .. }, old_addr) => {
|
||||||
|
process_identify_info(&info, &*peerstore, old_addr, addr_ttl)?
|
||||||
|
}
|
||||||
|
_ => unreachable!(
|
||||||
|
"the identify protocol guarantees that we receive remote \
|
||||||
|
information when we dial a node"
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((connec, real_addr))
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(future) as Box<_>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the multiaddress is in the form `/p2p/...`, turn it into a `PeerId`.
|
||||||
|
// Otherwise, return it as-is.
|
||||||
|
fn multiaddr_to_peerid(addr: Multiaddr) -> Result<PeerId, Multiaddr> {
|
||||||
|
let components = addr.iter().collect::<Vec<_>>();
|
||||||
|
if components.len() < 1 {
|
||||||
|
return Err(addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
match components.last() {
|
||||||
|
Some(&AddrComponent::P2P(ref peer_id)) |
|
||||||
|
Some(&AddrComponent::IPFS(ref peer_id)) => {
|
||||||
|
// TODO: `peer_id` is sometimes in fact a CID here
|
||||||
|
match PeerId::from_bytes(peer_id.clone()) {
|
||||||
|
Ok(peer_id) => Ok(peer_id),
|
||||||
|
Err(_) => Err(addr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => Err(addr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// When passed the information sent by a remote, inserts the remote into the given peerstore and
|
||||||
|
// returns a multiaddr of the format `/p2p/...` corresponding to this node.
|
||||||
|
//
|
||||||
|
// > **Note**: This function is highly-specific, but this precise behaviour is needed in multiple
|
||||||
|
// > different places in the code.
|
||||||
|
fn process_identify_info<P>(
|
||||||
|
info: &IdentifyInfo,
|
||||||
|
peerstore: P,
|
||||||
|
client_addr: Multiaddr,
|
||||||
|
ttl: Duration,
|
||||||
|
) -> Result<Multiaddr, IoError>
|
||||||
|
where
|
||||||
|
P: Peerstore,
|
||||||
|
{
|
||||||
|
let peer_id = PeerId::from_public_key(&info.public_key);
|
||||||
|
peerstore
|
||||||
|
.peer_or_create(&peer_id)
|
||||||
|
.add_addr(client_addr, ttl);
|
||||||
|
Ok(AddrComponent::P2P(peer_id.into_bytes()).into())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
extern crate libp2p_tcp_transport;
|
||||||
|
extern crate tokio_core;
|
||||||
|
|
||||||
|
use self::libp2p_tcp_transport::TcpConfig;
|
||||||
|
use self::tokio_core::reactor::Core;
|
||||||
|
use IdentifyTransport;
|
||||||
|
use futures::{Future, Stream};
|
||||||
|
use libp2p_peerstore::{PeerAccess, PeerId, Peerstore};
|
||||||
|
use libp2p_peerstore::memory_peerstore::MemoryPeerstore;
|
||||||
|
use libp2p_swarm::Transport;
|
||||||
|
use multiaddr::{AddrComponent, Multiaddr};
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use std::iter;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dial_peer_id() {
|
||||||
|
// When we dial an `/p2p/...` address, the `IdentifyTransport` should look into the
|
||||||
|
// peerstore and dial one of the known multiaddresses of the node instead.
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct UnderlyingTrans {
|
||||||
|
inner: TcpConfig,
|
||||||
|
}
|
||||||
|
impl Transport for UnderlyingTrans {
|
||||||
|
type RawConn = <TcpConfig as Transport>::RawConn;
|
||||||
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
|
type ListenerUpgrade = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
type Dial = <TcpConfig as Transport>::Dial;
|
||||||
|
#[inline]
|
||||||
|
fn listen_on(
|
||||||
|
self,
|
||||||
|
_: Multiaddr,
|
||||||
|
) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
#[inline]
|
||||||
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
|
assert_eq!(
|
||||||
|
addr,
|
||||||
|
"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()
|
||||||
|
);
|
||||||
|
Ok(self.inner.dial(addr).unwrap_or_else(|_| panic!()))
|
||||||
|
}
|
||||||
|
#[inline]
|
||||||
|
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
|
||||||
|
self.inner.nat_traversal(a, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let peer_id = PeerId::from_public_key(&vec![1, 2, 3, 4]);
|
||||||
|
|
||||||
|
let peerstore = MemoryPeerstore::empty();
|
||||||
|
peerstore.peer_or_create(&peer_id).add_addr(
|
||||||
|
"/ip4/127.0.0.1/tcp/12345".parse().unwrap(),
|
||||||
|
Duration::from_secs(3600),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut core = Core::new().unwrap();
|
||||||
|
let underlying = UnderlyingTrans {
|
||||||
|
inner: TcpConfig::new(core.handle()),
|
||||||
|
};
|
||||||
|
let transport = IdentifyTransport::new(underlying, Arc::new(peerstore));
|
||||||
|
|
||||||
|
let future = transport
|
||||||
|
.dial(iter::once(AddrComponent::P2P(peer_id.into_bytes())).collect())
|
||||||
|
.unwrap_or_else(|_| panic!())
|
||||||
|
.then::<_, Result<(), ()>>(|_| Ok(()));
|
||||||
|
|
||||||
|
let _ = core.run(future).unwrap();
|
||||||
|
}
|
||||||
|
}
|
@ -32,6 +32,7 @@ use std::sync::{Mutex, MutexGuard};
|
|||||||
use std::vec::IntoIter as VecIntoIter;
|
use std::vec::IntoIter as VecIntoIter;
|
||||||
|
|
||||||
/// Implementation of the `Peerstore` trait that simply stores the peer information in memory.
|
/// Implementation of the `Peerstore` trait that simply stores the peer information in memory.
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct MemoryPeerstore {
|
pub struct MemoryPeerstore {
|
||||||
store: Mutex<HashMap<PeerId, PeerInfo>>,
|
store: Mutex<HashMap<PeerId, PeerInfo>>,
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,7 @@ let mut core = tokio_core::reactor::Core::new().unwrap();
|
|||||||
let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
||||||
.with_upgrade(Ping)
|
.with_upgrade(Ping)
|
||||||
.dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
.dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
||||||
.and_then(|(mut pinger, service)| {
|
.and_then(|((mut pinger, service), _)| {
|
||||||
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@
|
|||||||
//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
||||||
//! .with_upgrade(Ping)
|
//! .with_upgrade(Ping)
|
||||||
//! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
//! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
||||||
//! .and_then(|(mut pinger, service)| {
|
//! .and_then(|((mut pinger, service), _)| {
|
||||||
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
||||||
//! });
|
//! });
|
||||||
//!
|
//!
|
||||||
|
@ -39,7 +39,7 @@ let transport = TcpConfig::new(core.handle())
|
|||||||
|
|
||||||
let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
|
let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
|
||||||
.unwrap_or_else(|_| panic!("Unable to dial node"))
|
.unwrap_or_else(|_| panic!("Unable to dial node"))
|
||||||
.and_then(|connection| {
|
.and_then(|(connection, _)| {
|
||||||
// Sends "hello world" on the connection, will be encrypted.
|
// Sends "hello world" on the connection, will be encrypted.
|
||||||
write_all(connection, "hello world")
|
write_all(connection, "hello world")
|
||||||
});
|
});
|
||||||
|
@ -60,7 +60,7 @@
|
|||||||
//!
|
//!
|
||||||
//! let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
|
//! let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
|
||||||
//! .unwrap_or_else(|_| panic!("Unable to dial node"))
|
//! .unwrap_or_else(|_| panic!("Unable to dial node"))
|
||||||
//! .and_then(|connection| {
|
//! .and_then(|(connection, _)| {
|
||||||
//! // Sends "hello world" on the connection, will be encrypted.
|
//! // Sends "hello world" on the connection, will be encrypted.
|
||||||
//! write_all(connection, "hello world")
|
//! write_all(connection, "hello world")
|
||||||
//! });
|
//! });
|
||||||
|
@ -125,7 +125,7 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
|||||||
// TODO: right now the only available protocol is ping, but we want to replace it with
|
// TODO: right now the only available protocol is ping, but we want to replace it with
|
||||||
// something that is more simple to use
|
// something that is more simple to use
|
||||||
.dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
.dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
||||||
.and_then(|(mut pinger, service)| {
|
.and_then(|((mut pinger, service), _)| {
|
||||||
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -105,9 +105,9 @@ where
|
|||||||
C::NamesIter: Clone, // TODO: not elegant
|
C::NamesIter: Clone, // TODO: not elegant
|
||||||
{
|
{
|
||||||
type RawConn = <C::Output as StreamMuxer>::Substream;
|
type RawConn = <C::Output as StreamMuxer>::Substream;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
type ListenerUpgrade = FutureResult<Self::RawConn, IoError>;
|
type ListenerUpgrade = FutureResult<(Self::RawConn, Multiaddr), IoError>;
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
|
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
|
||||||
@ -141,7 +141,7 @@ where
|
|||||||
let ingoing = dial.clone()
|
let ingoing = dial.clone()
|
||||||
.map(|muxer| stream::repeat(muxer))
|
.map(|muxer| stream::repeat(muxer))
|
||||||
.flatten_stream()
|
.flatten_stream()
|
||||||
.map(move |muxer| ((&*muxer).clone(), addr.clone()));
|
.map(move |muxer| (&*muxer).clone());
|
||||||
|
|
||||||
let mut lock = self.shared.lock();
|
let mut lock = self.shared.lock();
|
||||||
lock.incoming.push(Box::new(ingoing) as Box<_>);
|
lock.incoming.push(Box::new(ingoing) as Box<_>);
|
||||||
@ -150,7 +150,10 @@ where
|
|||||||
|
|
||||||
let future = dial
|
let future = dial
|
||||||
.map_err(|err| err.lock().take().expect("error can only be extracted once"))
|
.map_err(|err| err.lock().take().expect("error can only be extracted once"))
|
||||||
.and_then(|dial| (&*dial).clone().outbound());
|
.and_then(|dial| {
|
||||||
|
let (dial, client_addr) = (&*dial).clone();
|
||||||
|
dial.outbound().map(|s| (s, client_addr))
|
||||||
|
});
|
||||||
Ok(Box::new(future) as Box<_>)
|
Ok(Box::new(future) as Box<_>)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -184,27 +187,27 @@ where
|
|||||||
/// `ConnectionReuse` struct.
|
/// `ConnectionReuse` struct.
|
||||||
pub struct ConnectionReuseListener<S, F, M>
|
pub struct ConnectionReuseListener<S, F, M>
|
||||||
where
|
where
|
||||||
S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
S: Stream<Item = F, Error = IoError>,
|
||||||
F: Future<Item = M, Error = IoError>,
|
F: Future<Item = (M, Multiaddr), Error = IoError>,
|
||||||
M: StreamMuxer,
|
M: StreamMuxer,
|
||||||
{
|
{
|
||||||
listener: StreamFuse<S>,
|
listener: StreamFuse<S>,
|
||||||
current_upgrades: Vec<(F, Multiaddr)>,
|
current_upgrades: Vec<F>,
|
||||||
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
|
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
|
impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
|
||||||
where S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
where S: Stream<Item = F, Error = IoError>,
|
||||||
F: Future<Item = M, Error = IoError>,
|
F: Future<Item = (M, Multiaddr), Error = IoError>,
|
||||||
M: StreamMuxer + Clone + 'static // TODO: 'static :(
|
M: StreamMuxer + Clone + 'static // TODO: 'static :(
|
||||||
{
|
{
|
||||||
type Item = (FutureResult<M::Substream, IoError>, Multiaddr);
|
type Item = FutureResult<(M::Substream, Multiaddr), IoError>;
|
||||||
type Error = IoError;
|
type Error = IoError;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
match self.listener.poll() {
|
match self.listener.poll() {
|
||||||
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
|
Ok(Async::Ready(Some(upgrade))) => {
|
||||||
self.current_upgrades.push((upgrade, client_addr));
|
self.current_upgrades.push(upgrade);
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
@ -225,11 +228,11 @@ where S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
|||||||
let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new();
|
let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new();
|
||||||
let mut early_ret = None;
|
let mut early_ret = None;
|
||||||
|
|
||||||
for (index, &mut (ref mut current_upgrade, ref mut client_addr)) in
|
for (index, current_upgrade) in
|
||||||
self.current_upgrades.iter_mut().enumerate()
|
self.current_upgrades.iter_mut().enumerate()
|
||||||
{
|
{
|
||||||
match current_upgrade.poll() {
|
match current_upgrade.poll() {
|
||||||
Ok(Async::Ready(muxer)) => {
|
Ok(Async::Ready((muxer, client_addr))) => {
|
||||||
let next_incoming = muxer.clone().inbound();
|
let next_incoming = muxer.clone().inbound();
|
||||||
self.connections.push((muxer, next_incoming, client_addr.clone()));
|
self.connections.push((muxer, next_incoming, client_addr.clone()));
|
||||||
upgrades_to_drop.push(index);
|
upgrades_to_drop.push(index);
|
||||||
@ -237,7 +240,7 @@ where S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
|||||||
Ok(Async::NotReady) => {},
|
Ok(Async::NotReady) => {},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
upgrades_to_drop.push(index);
|
upgrades_to_drop.push(index);
|
||||||
early_ret = Some(Async::Ready(Some((Err(err).into_future(), client_addr.clone()))));
|
early_ret = Some(Async::Ready(Some(Err(err).into_future())));
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -261,7 +264,7 @@ where S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
|||||||
Ok(Async::Ready(incoming)) => {
|
Ok(Async::Ready(incoming)) => {
|
||||||
let mut new_next = muxer.clone().inbound();
|
let mut new_next = muxer.clone().inbound();
|
||||||
*next_incoming = new_next;
|
*next_incoming = new_next;
|
||||||
return Ok(Async::Ready(Some((Ok(incoming).into_future(), client_addr.clone()))));
|
return Ok(Async::Ready(Some(Ok((incoming, client_addr.clone())).into_future())));
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => {}
|
Ok(Async::NotReady) => {}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
@ -149,7 +149,7 @@
|
|||||||
//! // TODO: right now the only available protocol is ping, but we want to replace it with
|
//! // TODO: right now the only available protocol is ping, but we want to replace it with
|
||||||
//! // something that is more simple to use
|
//! // something that is more simple to use
|
||||||
//! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
//! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
|
||||||
//! .and_then(|(mut pinger, service)| {
|
//! .and_then(|((mut pinger, service), _)| {
|
||||||
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
|
||||||
//! });
|
//! });
|
||||||
//!
|
//!
|
||||||
|
@ -76,8 +76,8 @@ pub struct SwarmController<T, C>
|
|||||||
{
|
{
|
||||||
transport: T,
|
transport: T,
|
||||||
upgraded: UpgradedNode<T, C>,
|
upgraded: UpgradedNode<T, C>,
|
||||||
new_listeners: mpsc::UnboundedSender<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
|
new_listeners: mpsc::UnboundedSender<Box<Stream<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>>>,
|
||||||
new_dialers: mpsc::UnboundedSender<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
|
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||||
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
|
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,10 +96,10 @@ impl<T, C> SwarmController<T, C>
|
|||||||
{
|
{
|
||||||
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr.clone()) {
|
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr.clone()) {
|
||||||
Ok(dial) => {
|
Ok(dial) => {
|
||||||
let dial = Box::new(dial.map(Into::into)) as Box<Future<Item = _, Error = _>>;
|
let dial = Box::new(dial.map(|(d, client_addr)| (d.into(), client_addr))) as Box<Future<Item = _, Error = _>>;
|
||||||
// Ignoring errors if the receiver has been closed, because in that situation
|
// Ignoring errors if the receiver has been closed, because in that situation
|
||||||
// nothing is going to be processed anyway.
|
// nothing is going to be processed anyway.
|
||||||
let _ = self.new_dialers.unbounded_send((dial, multiaddr));
|
let _ = self.new_dialers.unbounded_send(dial);
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
Err((_, multiaddr)) => {
|
Err((_, multiaddr)) => {
|
||||||
@ -117,12 +117,12 @@ impl<T, C> SwarmController<T, C>
|
|||||||
pub fn dial_custom_handler<Du, Df, Dfu>(&self, multiaddr: Multiaddr, upgrade: Du, and_then: Df)
|
pub fn dial_custom_handler<Du, Df, Dfu>(&self, multiaddr: Multiaddr, upgrade: Du, and_then: Df)
|
||||||
-> Result<(), Multiaddr>
|
-> Result<(), Multiaddr>
|
||||||
where Du: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
|
where Du: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
|
||||||
Df: FnOnce(Du::Output) -> Dfu + 'static, // TODO: 'static :-/
|
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
|
||||||
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
|
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
|
||||||
{
|
{
|
||||||
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) {
|
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) {
|
||||||
Ok(dial) => {
|
Ok(dial) => {
|
||||||
let dial = Box::new(dial.and_then(and_then)) as Box<_>;
|
let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>;
|
||||||
// Ignoring errors if the receiver has been closed, because in that situation
|
// Ignoring errors if the receiver has been closed, because in that situation
|
||||||
// nothing is going to be processed anyway.
|
// nothing is going to be processed anyway.
|
||||||
let _ = self.new_toprocess.unbounded_send(dial);
|
let _ = self.new_toprocess.unbounded_send(dial);
|
||||||
@ -158,12 +158,12 @@ pub struct SwarmFuture<T, C, H, F>
|
|||||||
{
|
{
|
||||||
upgraded: UpgradedNode<T, C>,
|
upgraded: UpgradedNode<T, C>,
|
||||||
handler: H,
|
handler: H,
|
||||||
new_listeners: mpsc::UnboundedReceiver<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
|
new_listeners: mpsc::UnboundedReceiver<Box<Stream<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>>>,
|
||||||
next_incoming: Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
|
next_incoming: Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
|
||||||
listeners: Vec<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
|
listeners: Vec<Box<Stream<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>>>,
|
||||||
listeners_upgrade: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
|
listeners_upgrade: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||||
dialers: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
|
dialers: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||||
new_dialers: mpsc::UnboundedReceiver<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
|
new_dialers: mpsc::UnboundedReceiver<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||||
to_process: Vec<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
|
to_process: Vec<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
|
||||||
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
|
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
|
||||||
}
|
}
|
||||||
@ -203,8 +203,8 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
|
|||||||
};
|
};
|
||||||
|
|
||||||
match self.new_dialers.poll() {
|
match self.new_dialers.poll() {
|
||||||
Ok(Async::Ready(Some((new_dialer, multiaddr)))) => {
|
Ok(Async::Ready(Some(new_dialer))) => {
|
||||||
self.dialers.push((new_dialer, multiaddr));
|
self.dialers.push(new_dialer);
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) | Err(_) => {
|
Ok(Async::Ready(None)) | Err(_) => {
|
||||||
// New dialers sender has been closed.
|
// New dialers sender has been closed.
|
||||||
@ -225,9 +225,9 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
|
|||||||
for n in (0 .. self.listeners.len()).rev() {
|
for n in (0 .. self.listeners.len()).rev() {
|
||||||
let mut listener = self.listeners.swap_remove(n);
|
let mut listener = self.listeners.swap_remove(n);
|
||||||
match listener.poll() {
|
match listener.poll() {
|
||||||
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
|
Ok(Async::Ready(Some(upgrade))) => {
|
||||||
self.listeners.push(listener);
|
self.listeners.push(listener);
|
||||||
self.listeners_upgrade.push((upgrade, client_addr));
|
self.listeners_upgrade.push(upgrade);
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
self.listeners.push(listener);
|
self.listeners.push(listener);
|
||||||
@ -238,26 +238,26 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
|
|||||||
}
|
}
|
||||||
|
|
||||||
for n in (0 .. self.listeners_upgrade.len()).rev() {
|
for n in (0 .. self.listeners_upgrade.len()).rev() {
|
||||||
let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n);
|
let mut upgrade = self.listeners_upgrade.swap_remove(n);
|
||||||
match upgrade.poll() {
|
match upgrade.poll() {
|
||||||
Ok(Async::Ready(output)) => {
|
Ok(Async::Ready((output, client_addr))) => {
|
||||||
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
|
self.to_process.push(future::Either::A(handler(output, client_addr).into_future()));
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
self.listeners_upgrade.push((upgrade, addr));
|
self.listeners_upgrade.push(upgrade);
|
||||||
},
|
},
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for n in (0 .. self.dialers.len()).rev() {
|
for n in (0 .. self.dialers.len()).rev() {
|
||||||
let (mut dialer, addr) = self.dialers.swap_remove(n);
|
let mut dialer = self.dialers.swap_remove(n);
|
||||||
match dialer.poll() {
|
match dialer.poll() {
|
||||||
Ok(Async::Ready(output)) => {
|
Ok(Async::Ready((output, addr))) => {
|
||||||
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
|
self.to_process.push(future::Either::A(handler(output, addr).into_future()));
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
self.dialers.push((dialer, addr));
|
self.dialers.push(dialer);
|
||||||
},
|
},
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err),
|
||||||
}
|
}
|
||||||
|
@ -60,15 +60,15 @@ pub trait Transport {
|
|||||||
/// An item should be produced whenever a connection is received at the lowest level of the
|
/// An item should be produced whenever a connection is received at the lowest level of the
|
||||||
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
|
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
|
||||||
/// taken place, and that connection has been upgraded to the wanted protocols.
|
/// taken place, and that connection has been upgraded to the wanted protocols.
|
||||||
type Listener: Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>;
|
type Listener: Stream<Item = Self::ListenerUpgrade, Error = IoError>;
|
||||||
|
|
||||||
/// After a connection has been received, we may need to do some asynchronous pre-processing
|
/// After a connection has been received, we may need to do some asynchronous pre-processing
|
||||||
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
|
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
|
||||||
/// want to be able to continue polling on the listener.
|
/// want to be able to continue polling on the listener.
|
||||||
type ListenerUpgrade: Future<Item = Self::RawConn, Error = IoError>;
|
type ListenerUpgrade: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
|
||||||
|
|
||||||
/// A future which indicates that we are currently dialing to a peer.
|
/// A future which indicates that we are currently dialing to a peer.
|
||||||
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
type Dial: IntoFuture<Item = (Self::RawConn, Multiaddr), Error = IoError>;
|
||||||
|
|
||||||
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
|
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
|
||||||
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
|
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
|
||||||
@ -174,9 +174,9 @@ pub struct DeniedTransport;
|
|||||||
impl Transport for DeniedTransport {
|
impl Transport for DeniedTransport {
|
||||||
// TODO: could use `!` for associated types once stable
|
// TODO: could use `!` for associated types once stable
|
||||||
type RawConn = Cursor<Vec<u8>>;
|
type RawConn = Cursor<Vec<u8>>;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type ListenerUpgrade = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
@ -214,9 +214,8 @@ where
|
|||||||
{
|
{
|
||||||
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
|
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
|
||||||
type Listener = EitherListenStream<A::Listener, B::Listener>;
|
type Listener = EitherListenStream<A::Listener, B::Listener>;
|
||||||
type ListenerUpgrade = EitherTransportFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
|
type ListenerUpgrade = EitherListenUpgrade<A::ListenerUpgrade, B::ListenerUpgrade>;
|
||||||
type Dial =
|
type Dial = EitherListenUpgrade<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
|
||||||
EitherTransportFuture<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
|
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
let (first, addr) = match self.0.listen_on(addr) {
|
let (first, addr) = match self.0.listen_on(addr) {
|
||||||
@ -232,12 +231,12 @@ where
|
|||||||
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
let (first, addr) = match self.0.dial(addr) {
|
let (first, addr) = match self.0.dial(addr) {
|
||||||
Ok(connec) => return Ok(EitherTransportFuture::First(connec.into_future())),
|
Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())),
|
||||||
Err(err) => err,
|
Err(err) => err,
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.1.dial(addr) {
|
match self.1.dial(addr) {
|
||||||
Ok(connec) => Ok(EitherTransportFuture::Second(connec.into_future())),
|
Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())),
|
||||||
Err((second, addr)) => Err((OrTransport(first, second), addr)),
|
Err((second, addr)) => Err((OrTransport(first, second), addr)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,19 +338,19 @@ pub enum EitherListenStream<A, B> {
|
|||||||
|
|
||||||
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
|
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
|
||||||
where
|
where
|
||||||
AStream: Stream<Item = (AInner, Multiaddr), Error = IoError>,
|
AStream: Stream<Item = AInner, Error = IoError>,
|
||||||
BStream: Stream<Item = (BInner, Multiaddr), Error = IoError>,
|
BStream: Stream<Item = BInner, Error = IoError>,
|
||||||
{
|
{
|
||||||
type Item = (EitherTransportFuture<AInner, BInner>, Multiaddr);
|
type Item = EitherListenUpgrade<AInner, BInner>;
|
||||||
type Error = IoError;
|
type Error = IoError;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
match self {
|
match self {
|
||||||
&mut EitherListenStream::First(ref mut a) => a.poll()
|
&mut EitherListenStream::First(ref mut a) => a.poll()
|
||||||
.map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::First(s), a)))),
|
.map(|i| i.map(|v| v.map(EitherListenUpgrade::First))),
|
||||||
&mut EitherListenStream::Second(ref mut a) => a.poll()
|
&mut EitherListenStream::Second(ref mut a) => a.poll()
|
||||||
.map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::Second(s), a)))),
|
.map(|i| i.map(|v| v.map(EitherListenUpgrade::Second))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -384,12 +383,44 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
|
||||||
|
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
|
||||||
|
// modifiers to it. This custom enum is a combination of Either and these modifiers.
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub enum EitherListenUpgrade<A, B> {
|
||||||
|
First(A),
|
||||||
|
Second(B),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, B, Ao, Bo> Future for EitherListenUpgrade<A, B>
|
||||||
|
where
|
||||||
|
A: Future<Item = (Ao, Multiaddr), Error = IoError>,
|
||||||
|
B: Future<Item = (Bo, Multiaddr), Error = IoError>,
|
||||||
|
{
|
||||||
|
type Item = (EitherSocket<Ao, Bo>, Multiaddr);
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self {
|
||||||
|
&mut EitherListenUpgrade::First(ref mut a) => {
|
||||||
|
let (item, addr) = try_ready!(a.poll());
|
||||||
|
Ok(Async::Ready((EitherSocket::First(item), addr)))
|
||||||
|
}
|
||||||
|
&mut EitherListenUpgrade::Second(ref mut b) => {
|
||||||
|
let (item, addr) = try_ready!(b.poll());
|
||||||
|
Ok(Async::Ready((EitherSocket::Second(item), addr)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Implements `Future` and redirects calls to either `First` or `Second`.
|
/// Implements `Future` and redirects calls to either `First` or `Second`.
|
||||||
///
|
///
|
||||||
/// Additionally, the output will be wrapped inside a `EitherSocket`.
|
/// Additionally, the output will be wrapped inside a `EitherSocket`.
|
||||||
///
|
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
|
||||||
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
|
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
|
||||||
/// > removed eventually.
|
// modifiers to it. This custom enum is a combination of Either and these modifiers.
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone)]
|
||||||
pub enum EitherTransportFuture<A, B> {
|
pub enum EitherTransportFuture<A, B> {
|
||||||
First(A),
|
First(A),
|
||||||
@ -650,8 +681,9 @@ pub enum EitherUpgradeIdentifier<A, B> {
|
|||||||
///
|
///
|
||||||
/// Additionally, the output will be wrapped inside a `EitherSocket`.
|
/// Additionally, the output will be wrapped inside a `EitherSocket`.
|
||||||
///
|
///
|
||||||
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
|
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
|
||||||
/// > removed eventually.
|
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
|
||||||
|
// modifiers to it. This custom enum is a combination of Either and these modifiers.
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone)]
|
||||||
pub enum EitherConnUpgrFuture<A, B> {
|
pub enum EitherConnUpgrFuture<A, B> {
|
||||||
First(A),
|
First(A),
|
||||||
@ -838,7 +870,7 @@ where
|
|||||||
pub fn dial(
|
pub fn dial(
|
||||||
self,
|
self,
|
||||||
addr: Multiaddr,
|
addr: Multiaddr,
|
||||||
) -> Result<Box<Future<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)> {
|
) -> Result<Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)> {
|
||||||
let upgrade = self.upgrade;
|
let upgrade = self.upgrade;
|
||||||
|
|
||||||
let dialed_fut = match self.transports.dial(addr.clone()) {
|
let dialed_fut = match self.transports.dial(addr.clone()) {
|
||||||
@ -855,15 +887,16 @@ where
|
|||||||
|
|
||||||
let future = dialed_fut
|
let future = dialed_fut
|
||||||
// Try to negotiate the protocol.
|
// Try to negotiate the protocol.
|
||||||
.and_then(move |connection| {
|
.and_then(move |(connection, client_addr)| {
|
||||||
let iter = upgrade.protocol_names()
|
let iter = upgrade.protocol_names()
|
||||||
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
|
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
|
||||||
let negotiated = multistream_select::dialer_select_proto(connection, iter)
|
let negotiated = multistream_select::dialer_select_proto(connection, iter)
|
||||||
.map_err(|err| IoError::new(IoErrorKind::Other, err));
|
.map_err(|err| IoError::new(IoErrorKind::Other, err));
|
||||||
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade))
|
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
|
||||||
})
|
})
|
||||||
.and_then(move |(upgrade_id, connection, upgrade)| {
|
.and_then(move |(upgrade_id, connection, upgrade, client_addr)| {
|
||||||
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr)
|
let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr);
|
||||||
|
f.map(|v| (v, client_addr))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Box::new(future))
|
Ok(Box::new(future))
|
||||||
@ -908,7 +941,7 @@ where
|
|||||||
self,
|
self,
|
||||||
addr: Multiaddr,
|
addr: Multiaddr,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError> + 'a>, Multiaddr), Error = IoError> + 'a>, Multiaddr),
|
(Box<Stream<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, Error = IoError> + 'a>, Multiaddr),
|
||||||
(Self, Multiaddr),
|
(Self, Multiaddr),
|
||||||
>
|
>
|
||||||
where
|
where
|
||||||
@ -934,24 +967,24 @@ where
|
|||||||
// Instead the `stream` will produce `Ok(Err(...))`.
|
// Instead the `stream` will produce `Ok(Err(...))`.
|
||||||
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
|
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
|
||||||
let stream = listening_stream
|
let stream = listening_stream
|
||||||
.map(move |(connection, client_addr)| {
|
.map(move |connection| {
|
||||||
let upgrade = upgrade.clone();
|
let upgrade = upgrade.clone();
|
||||||
let remote_addr = client_addr.clone();
|
|
||||||
let connection = connection
|
let connection = connection
|
||||||
// Try to negotiate the protocol
|
// Try to negotiate the protocol
|
||||||
.and_then(move |connection| {
|
.and_then(move |(connection, remote_addr)| {
|
||||||
let iter = upgrade.protocol_names()
|
let iter = upgrade.protocol_names()
|
||||||
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
|
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
|
||||||
multistream_select::listener_select_proto(connection, iter)
|
multistream_select::listener_select_proto(connection, iter)
|
||||||
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
||||||
.and_then(move |(upgrade_id, connection)| {
|
.and_then(move |(upgrade_id, connection)| {
|
||||||
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener,
|
let fut = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener,
|
||||||
&remote_addr)
|
&remote_addr);
|
||||||
|
fut.map(move |c| (c, remote_addr))
|
||||||
})
|
})
|
||||||
.into_future()
|
.into_future()
|
||||||
});
|
});
|
||||||
|
|
||||||
(Box::new(connection) as Box<_>, client_addr)
|
Box::new(connection) as Box<_>
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok((Box::new(stream), new_addr))
|
Ok((Box::new(stream), new_addr))
|
||||||
@ -967,9 +1000,9 @@ where
|
|||||||
C: Clone,
|
C: Clone,
|
||||||
{
|
{
|
||||||
type RawConn = C::Output;
|
type RawConn = C::Output;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
type ListenerUpgrade = Box<Future<Item = C::Output, Error = IoError>>;
|
type ListenerUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
|
||||||
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
|
type Dial = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
|
@ -59,7 +59,7 @@ use std::io::Error as IoError;
|
|||||||
use std::iter;
|
use std::iter;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew};
|
use tokio_core::net::{TcpStream, TcpListener};
|
||||||
use futures::future::{self, Future, FutureResult, IntoFuture};
|
use futures::future::{self, Future, FutureResult, IntoFuture};
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use multiaddr::{Multiaddr, AddrComponent, ToMultiaddr};
|
use multiaddr::{Multiaddr, AddrComponent, ToMultiaddr};
|
||||||
@ -86,9 +86,9 @@ impl TcpConfig {
|
|||||||
|
|
||||||
impl Transport for TcpConfig {
|
impl Transport for TcpConfig {
|
||||||
type RawConn = TcpStream;
|
type RawConn = TcpStream;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
type ListenerUpgrade = FutureResult<Self::RawConn, IoError>;
|
type ListenerUpgrade = FutureResult<(Self::RawConn, Multiaddr), IoError>;
|
||||||
type Dial = TcpStreamNew;
|
type Dial = Box<Future<Item = (TcpStream, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
/// Listen on the given multi-addr.
|
/// Listen on the given multi-addr.
|
||||||
/// Returns the address back if it isn't supported.
|
/// Returns the address back if it isn't supported.
|
||||||
@ -112,7 +112,7 @@ impl Transport for TcpConfig {
|
|||||||
listener.incoming().map(|(sock, addr)| {
|
listener.incoming().map(|(sock, addr)| {
|
||||||
let addr = addr.to_multiaddr()
|
let addr = addr.to_multiaddr()
|
||||||
.expect("generating a multiaddr from a socket addr never fails");
|
.expect("generating a multiaddr from a socket addr never fails");
|
||||||
(Ok(sock).into_future(), addr)
|
Ok((sock, addr)).into_future()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.flatten_stream();
|
.flatten_stream();
|
||||||
@ -127,7 +127,9 @@ impl Transport for TcpConfig {
|
|||||||
/// or gives back the multiaddress.
|
/// or gives back the multiaddress.
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
||||||
Ok(TcpStream::connect(&socket_addr, &self.event_loop))
|
let fut = TcpStream::connect(&socket_addr, &self.event_loop)
|
||||||
|
.map(|t| (t, addr));
|
||||||
|
Ok(Box::new(fut) as Box<_>)
|
||||||
} else {
|
} else {
|
||||||
Err((self, addr))
|
Err((self, addr))
|
||||||
}
|
}
|
||||||
@ -249,8 +251,8 @@ mod tests {
|
|||||||
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
|
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
|
||||||
let tcp = TcpConfig::new(core.handle());
|
let tcp = TcpConfig::new(core.handle());
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
|
let listener = tcp.listen_on(addr).unwrap().0.for_each(|sock| {
|
||||||
sock.and_then(|sock| {
|
sock.and_then(|(sock, _)| {
|
||||||
// Define what to do with the socket that just connected to us
|
// Define what to do with the socket that just connected to us
|
||||||
// Which in this case is read 3 bytes
|
// Which in this case is read 3 bytes
|
||||||
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
|
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
|
||||||
@ -274,7 +276,7 @@ mod tests {
|
|||||||
let socket = tcp.dial(addr.clone()).unwrap();
|
let socket = tcp.dial(addr.clone()).unwrap();
|
||||||
// Define what to do with the socket once it's obtained
|
// Define what to do with the socket once it's obtained
|
||||||
let action = socket.then(|sock| match sock {
|
let action = socket.then(|sock| match sock {
|
||||||
Ok(mut s) => {
|
Ok((mut s, _)) => {
|
||||||
let written = s.write(&[0x1, 0x2, 0x3]).unwrap();
|
let written = s.write(&[0x1, 0x2, 0x3]).unwrap();
|
||||||
Ok(written)
|
Ok(written)
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,7 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use futures::{Async, Future, Poll, Stream, Then as FutureThen};
|
use futures::{Async, Future, Poll, Stream};
|
||||||
use futures::stream::Then as StreamThen;
|
use futures::stream::Then as StreamThen;
|
||||||
use futures::sync::{mpsc, oneshot};
|
use futures::sync::{mpsc, oneshot};
|
||||||
use multiaddr::{AddrComponent, Multiaddr};
|
use multiaddr::{AddrComponent, Multiaddr};
|
||||||
@ -49,14 +49,9 @@ impl BrowserWsConfig {
|
|||||||
|
|
||||||
impl Transport for BrowserWsConfig {
|
impl Transport for BrowserWsConfig {
|
||||||
type RawConn = BrowserWsConn;
|
type RawConn = BrowserWsConn;
|
||||||
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>; // TODO: use `!`
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>; // TODO: use `!`
|
||||||
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>; // TODO: use `!`
|
type ListenerUpgrade = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>; // TODO: use `!`
|
||||||
type Dial = FutureThen<
|
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
oneshot::Receiver<Result<BrowserWsConn, IoError>>,
|
|
||||||
Result<BrowserWsConn, IoError>,
|
|
||||||
fn(Result<Result<BrowserWsConn, IoError>, oneshot::Canceled>)
|
|
||||||
-> Result<BrowserWsConn, IoError>,
|
|
||||||
>;
|
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
@ -191,9 +186,9 @@ impl Transport for BrowserWsConfig {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(open_rx.then(|result| {
|
Ok(Box::new(open_rx.then(|result| {
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(r)) => Ok(r),
|
Ok(Ok(r)) => Ok((r, original_addr)),
|
||||||
Ok(Err(e)) => Err(e),
|
Ok(Err(e)) => Err(e),
|
||||||
// `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by
|
// `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by
|
||||||
// the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself
|
// the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself
|
||||||
@ -202,7 +197,7 @@ impl Transport for BrowserWsConfig {
|
|||||||
// TODO: how do we break this cyclic dependency? difficult question
|
// TODO: how do we break this cyclic dependency? difficult question
|
||||||
Err(_) => unreachable!("the sending side will only close when we drop the future"),
|
Err(_) => unreachable!("the sending side will only close when we drop the future"),
|
||||||
}
|
}
|
||||||
}))
|
})) as Box<_>)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||||
|
@ -58,10 +58,10 @@ where
|
|||||||
type RawConn = Box<AsyncStream>;
|
type RawConn = Box<AsyncStream>;
|
||||||
type Listener = stream::Map<
|
type Listener = stream::Map<
|
||||||
T::Listener,
|
T::Listener,
|
||||||
fn((<T as Transport>::ListenerUpgrade, Multiaddr)) -> (Self::ListenerUpgrade, Multiaddr),
|
fn(<T as Transport>::ListenerUpgrade) -> Self::ListenerUpgrade,
|
||||||
>;
|
>;
|
||||||
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type ListenerUpgrade = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||||
|
|
||||||
fn listen_on(
|
fn listen_on(
|
||||||
self,
|
self,
|
||||||
@ -89,12 +89,12 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, mut client_addr)| {
|
let listen = inner_listen.map::<_, fn(_) -> _>(|stream| {
|
||||||
// Need to suffix `/ws` to each client address.
|
|
||||||
client_addr.append(AddrComponent::WS);
|
|
||||||
|
|
||||||
// Upgrade the listener to websockets like the websockets library requires us to do.
|
// Upgrade the listener to websockets like the websockets library requires us to do.
|
||||||
let upgraded = stream.and_then(|stream| {
|
let upgraded = stream.and_then(|(stream, mut client_addr)| {
|
||||||
|
// Need to suffix `/ws` to each client address.
|
||||||
|
client_addr.append(AddrComponent::WS);
|
||||||
|
|
||||||
stream
|
stream
|
||||||
.into_ws()
|
.into_ws()
|
||||||
.map_err(|e| IoError::new(IoErrorKind::Other, e.3))
|
.map_err(|e| IoError::new(IoErrorKind::Other, e.3))
|
||||||
@ -130,12 +130,10 @@ where
|
|||||||
.map(|s| Box::new(Ok(s).into_future()) as Box<Future<Item = _, Error = _>>)
|
.map(|s| Box::new(Ok(s).into_future()) as Box<Future<Item = _, Error = _>>)
|
||||||
.into_future()
|
.into_future()
|
||||||
.flatten()
|
.flatten()
|
||||||
|
.map(move |v| (v, client_addr))
|
||||||
});
|
});
|
||||||
|
|
||||||
(
|
Box::new(upgraded) as Box<Future<Item = _, Error = _>>
|
||||||
Box::new(upgraded) as Box<Future<Item = _, Error = _>>,
|
|
||||||
client_addr,
|
|
||||||
)
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok((listen, new_addr))
|
Ok((listen, new_addr))
|
||||||
@ -161,7 +159,7 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let dial = inner_dial.into_future().and_then(move |connec| {
|
let dial = inner_dial.into_future().and_then(move |(connec, client_addr)| {
|
||||||
// We pass a dummy address to `ClientBuilder` because it is never used anywhere
|
// We pass a dummy address to `ClientBuilder` because it is never used anywhere
|
||||||
// in the negotiation anyway, and we use `async_connect_on` to pass a stream.
|
// in the negotiation anyway, and we use `async_connect_on` to pass a stream.
|
||||||
ClientBuilder::new(if is_wss { "wss://127.0.0.1" } else { "ws://127.0.0.1" })
|
ClientBuilder::new(if is_wss { "wss://127.0.0.1" } else { "ws://127.0.0.1" })
|
||||||
@ -187,6 +185,7 @@ where
|
|||||||
let read_write = RwStreamSink::new(framed_data);
|
let read_write = RwStreamSink::new(framed_data);
|
||||||
Box::new(read_write) as Box<AsyncStream>
|
Box::new(read_write) as Box<AsyncStream>
|
||||||
})
|
})
|
||||||
|
.map(|c| (c, client_addr))
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Box::new(dial) as Box<_>)
|
Ok(Box::new(dial) as Box<_>)
|
||||||
@ -235,8 +234,8 @@ mod tests {
|
|||||||
let listener = listener
|
let listener = listener
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(e, _)| e)
|
.map_err(|(e, _)| e)
|
||||||
.and_then(|(c, _)| c.unwrap().0);
|
.and_then(|(c, _)| c.unwrap().map(|v| v.0));
|
||||||
let dialer = ws_config.clone().dial(addr).unwrap();
|
let dialer = ws_config.clone().dial(addr).unwrap().map(|v| v.0);
|
||||||
|
|
||||||
let future = listener
|
let future = listener
|
||||||
.select(dialer)
|
.select(dialer)
|
||||||
@ -259,8 +258,8 @@ mod tests {
|
|||||||
let listener = listener
|
let listener = listener
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(e, _)| e)
|
.map_err(|(e, _)| e)
|
||||||
.and_then(|(c, _)| c.unwrap().0);
|
.and_then(|(c, _)| c.unwrap().map(|v| v.0));
|
||||||
let dialer = ws_config.clone().dial(addr).unwrap();
|
let dialer = ws_config.clone().dial(addr).unwrap().map(|v| v.0);
|
||||||
|
|
||||||
let future = listener
|
let future = listener
|
||||||
.select(dialer)
|
.select(dialer)
|
||||||
|
@ -53,7 +53,7 @@ fn client_to_server_outbound() {
|
|||||||
let future = listener
|
let future = listener
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.and_then(|(client, _)| client.unwrap().0)
|
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
|
||||||
.and_then(|client| client.outbound())
|
.and_then(|client| client.outbound())
|
||||||
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
|
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
|
||||||
.and_then(|client| {
|
.and_then(|client| {
|
||||||
@ -75,7 +75,7 @@ fn client_to_server_outbound() {
|
|||||||
.with_upgrade(multiplex::MultiplexConfig);
|
.with_upgrade(multiplex::MultiplexConfig);
|
||||||
|
|
||||||
let future = transport.dial(rx.recv().unwrap()).unwrap()
|
let future = transport.dial(rx.recv().unwrap()).unwrap()
|
||||||
.and_then(|client| client.inbound())
|
.and_then(|client| client.0.inbound())
|
||||||
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
|
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
|
||||||
.and_then(|server| server.send("hello world".into()))
|
.and_then(|server| server.send("hello world".into()))
|
||||||
.map(|_| ());
|
.map(|_| ());
|
||||||
@ -102,7 +102,7 @@ fn client_to_server_inbound() {
|
|||||||
let future = listener
|
let future = listener
|
||||||
.into_future()
|
.into_future()
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.and_then(|(client, _)| client.unwrap().0)
|
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
|
||||||
.and_then(|client| client.inbound())
|
.and_then(|client| client.inbound())
|
||||||
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
|
.map(|client| Framed::<_, bytes::BytesMut>::new(client))
|
||||||
.and_then(|client| {
|
.and_then(|client| {
|
||||||
@ -124,7 +124,7 @@ fn client_to_server_inbound() {
|
|||||||
.with_upgrade(multiplex::MultiplexConfig);
|
.with_upgrade(multiplex::MultiplexConfig);
|
||||||
|
|
||||||
let future = transport.dial(rx.recv().unwrap()).unwrap()
|
let future = transport.dial(rx.recv().unwrap()).unwrap()
|
||||||
.and_then(|client| client.outbound())
|
.and_then(|(client, _)| client.outbound())
|
||||||
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
|
.map(|server| Framed::<_, bytes::BytesMut>::new(server))
|
||||||
.and_then(|server| server.send("hello world".into()))
|
.and_then(|server| server.send("hello world".into()))
|
||||||
.map(|_| ());
|
.map(|_| ());
|
||||||
|
@ -52,11 +52,30 @@ impl fmt::Display for Multiaddr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Multiaddr {
|
impl Multiaddr {
|
||||||
|
/// Returns the raw bytes representation of the multiaddr.
|
||||||
|
#[inline]
|
||||||
|
pub fn into_bytes(self) -> Vec<u8> {
|
||||||
|
self.bytes
|
||||||
|
}
|
||||||
|
|
||||||
/// Return a copy to disallow changing the bytes directly
|
/// Return a copy to disallow changing the bytes directly
|
||||||
pub fn to_bytes(&self) -> Vec<u8> {
|
pub fn to_bytes(&self) -> Vec<u8> {
|
||||||
self.bytes.to_owned()
|
self.bytes.to_owned()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Produces a `Multiaddr` from its bytes representation.
|
||||||
|
pub fn from_bytes(bytes: Vec<u8>) -> Result<Multiaddr> {
|
||||||
|
{
|
||||||
|
let mut ptr = &bytes[..];
|
||||||
|
while !ptr.is_empty() {
|
||||||
|
let (_, new_ptr) = AddrComponent::from_bytes(ptr)?;
|
||||||
|
ptr = new_ptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Multiaddr { bytes })
|
||||||
|
}
|
||||||
|
|
||||||
/// Extracts a slice containing the entire underlying vector.
|
/// Extracts a slice containing the entire underlying vector.
|
||||||
pub fn as_slice(&self) -> &[u8] {
|
pub fn as_slice(&self) -> &[u8] {
|
||||||
&self.bytes
|
&self.bytes
|
||||||
|
@ -339,6 +339,8 @@ impl AddrComponent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Builds an `AddrComponent` from an array that starts with a bytes representation. On
|
||||||
|
/// success, also returns the rest of the slice.
|
||||||
pub fn from_bytes(input: &[u8]) -> Result<(AddrComponent, &[u8])> {
|
pub fn from_bytes(input: &[u8]) -> Result<(AddrComponent, &[u8])> {
|
||||||
let (proto_num, proto_id_len) = u64::decode_var(input); // TODO: will panic if ID too large
|
let (proto_num, proto_id_len) = u64::decode_var(input); // TODO: will panic if ID too large
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user