diff --git a/Cargo.toml b/Cargo.toml index 85dd7987..341e829d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ members = [ "protocols/identify", "protocols/kad", "protocols/ping", + "protocols/observed", "transports/relay", "protocols/secio", "muxers/mplex", diff --git a/core/src/upgrade/traits.rs b/core/src/upgrade/traits.rs index 82efde57..16493e4c 100644 --- a/core/src/upgrade/traits.rs +++ b/core/src/upgrade/traits.rs @@ -75,10 +75,5 @@ pub trait ConnectionUpgrade { /// /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), /// this function returns a future instead of the direct output. - fn upgrade( - self, - socket: C, - id: Self::UpgradeIdentifier, - ty: Endpoint, - ) -> Self::Future; + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future; } diff --git a/protocols/observed/Cargo.toml b/protocols/observed/Cargo.toml new file mode 100644 index 00000000..74f08a88 --- /dev/null +++ b/protocols/observed/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "libp2p-observed-address" +version = "0.1.0" +authors = ["Parity Technologies "] +license = "MIT" + +[dependencies] +bytes = "0.4" +futures = "0.1" +libp2p-core = { path = "../../core" } +tokio-codec = "0.1" +tokio-io = "0.1" +unsigned-varint = { version = "0.2.1", features = ["codec"] } + +[dev-dependencies] +tokio = "0.1" diff --git a/protocols/observed/src/lib.rs b/protocols/observed/src/lib.rs new file mode 100644 index 00000000..be17b1a4 --- /dev/null +++ b/protocols/observed/src/lib.rs @@ -0,0 +1,158 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Connection upgrade to allow retrieving the externally visible address (as dialer) or +//! to report the externally visible address (as listener). + +extern crate bytes; +extern crate futures; +extern crate libp2p_core; +extern crate tokio_codec; +extern crate tokio_io; +extern crate unsigned_varint; + +use bytes::Bytes; +use futures::{future, prelude::*}; +use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr}; +use std::{io, iter}; +use tokio_codec::{FramedRead, FramedWrite}; +use tokio_io::{AsyncRead, AsyncWrite}; +use unsigned_varint::codec::UviBytes; + +/// The output, this connection upgrade produces. +pub enum Output { + /// As `Dialer`, we get our own externally observed address. + Address(Multiaddr), + /// As `Listener`, we return a sender which allows reporting the observed + /// address the client. + Sender(Sender) +} + +/// `Sender` allows reporting back the observed address to the remote endpoint. +pub struct Sender { + io: FramedWrite +} + +impl Sender { + /// Send address `a` to remote as the observed address. + pub fn send_address(self, a: Multiaddr) -> impl Future { + self.io.send(Bytes::from(a.into_bytes())).map(|_io| ()) + } +} + +/// The connection upgrade type to retrieve or report externally visible addresses. +pub struct Observed {} + +impl Observed { + pub fn new() -> Self { + Observed {} + } +} + +impl ConnectionUpgrade for Observed +where + C: AsyncRead + AsyncWrite + Send + 'static +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + type Output = Output; + type Future = Box + Send>; + + fn protocol_names(&self) -> Self::NamesIter { + iter::once((Bytes::from("/paritytech/observed-address/0.1.0"), ())) + } + + fn upgrade(self, conn: C, _: (), role: Endpoint) -> Self::Future { + match role { + Endpoint::Dialer => { + let io = FramedRead::new(conn, UviBytes::default()); + let future = io.into_future() + .map_err(|(e, _): (io::Error, FramedRead)| e) + .and_then(move |(bytes, _)| { + if let Some(b) = bytes { + let ma = Multiaddr::from_bytes(b.to_vec()) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + Ok(Output::Address(ma)) + } else { + Err(io::ErrorKind::InvalidData.into()) + } + }); + Box::new(future) + } + Endpoint::Listener => { + let io = FramedWrite::new(conn, UviBytes::default()); + Box::new(future::ok(Output::Sender(Sender { io }))) + } + } + } +} + +#[cfg(test)] +mod tests { + extern crate tokio; + + use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr}; + use self::tokio::runtime::current_thread; + use self::tokio::net::{TcpListener, TcpStream}; + use super::*; + + #[test] + fn observed_address() { + let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let server_addr = server.local_addr().unwrap(); + + let observed_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); + let observed_addr2 = observed_addr1.clone(); + + let server = server.incoming() + .into_future() + .map_err(|(e, _)| e.into()) + .and_then(move |(conn, _)| { + Observed::new().upgrade(conn.unwrap(), (), Endpoint::Listener) + }) + .and_then(move |output| { + match output { + Output::Sender(s) => s.send_address(observed_addr1), + Output::Address(_) => unreachable!() + } + }); + + let client = TcpStream::connect(&server_addr) + .map_err(|e| e.into()) + .and_then(|conn| { + Observed::new().upgrade(conn, (), Endpoint::Dialer) + }) + .map(move |output| { + match output { + Output::Address(addr) => { + eprintln!("{} {}", addr, observed_addr2); + assert_eq!(addr, observed_addr2) + } + _ => unreachable!() + } + }); + + current_thread::block_on_all(future::lazy(move || { + current_thread::spawn(server.map_err(|e| panic!("server error: {}", e)).map(|_| ())); + client.map_err(|e| panic!("client error: {}", e)) + })) + .unwrap(); + } +}