From 68c8627597b2edd11a6d05e9f2a7ba9a83473bec Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 10 Jan 2018 17:35:22 +0100 Subject: [PATCH] Correctly report the address of the dialer in identify --- libp2p-identify/src/lib.rs | 53 +++++++++++++++++------------------ libp2p-ping/src/lib.rs | 22 ++++++++++----- libp2p-secio/src/lib.rs | 3 +- libp2p-swarm/src/transport.rs | 31 +++++++++++--------- multiplex-rs/src/lib.rs | 4 +-- 5 files changed, 62 insertions(+), 51 deletions(-) diff --git a/libp2p-identify/src/lib.rs b/libp2p-identify/src/lib.rs index 54d4c57e..37c832aa 100644 --- a/libp2p-identify/src/lib.rs +++ b/libp2p-identify/src/lib.rs @@ -49,17 +49,17 @@ mod structs_proto; /// Prototype for an upgrade to the identity protocol. #[derive(Debug, Clone)] pub struct IdentifyProtocol { - pub information: IdentifyInfo, -} - -impl IdentifyProtocol { - /// Builds a new `IdentifyProtocol`. - #[inline] - pub fn new(information: IdentifyInfo) -> IdentifyProtocol { - IdentifyProtocol { - information - } - } + /// Our public key to report to the remote. + pub public_key: Vec, + /// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`. + pub protocol_version: String, + /// Name and version of the client. Can be thought as similar to the `User-Agent` header + /// of HTTP. + pub agent_version: String, + /// Addresses that we are listening on. + pub listen_addrs: Vec, + /// Protocols supported by us. + pub protocols: Vec, } /// Information sent from the listener to the dialer. @@ -67,13 +67,14 @@ impl IdentifyProtocol { pub struct IdentifyInfo { /// Public key of the node. pub public_key: Vec, - /// Version of the "global" protocol, eg. `ipfs/1.0.0`. + /// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`. pub protocol_version: String, - /// Name and version. Can be thought as similar to the `User-Agent` header of HTTP. + /// Name and version of the client. Can be thought as similar to the `User-Agent` header + /// of HTTP. pub agent_version: String, - /// Addresses that are listened on. + /// Addresses that the remote is listening on. pub listen_addrs: Vec, - /// Address that the server uses to communicate with the dialer. + /// Our own address as reported by the remote. pub observed_addr: Multiaddr, /// Protocols supported by the remote. pub protocols: Vec, @@ -92,7 +93,7 @@ impl ConnectionUpgrade for IdentifyProtocol iter::once((Bytes::from("/ipfs/id/1.0.0"), ())) } - fn upgrade(self, socket: C, _: (), ty: Endpoint) -> Self::Future { + fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future { // TODO: use jack's varint library instead let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket); @@ -111,20 +112,18 @@ impl ConnectionUpgrade for IdentifyProtocol } Endpoint::Listener => { - let info = self.information; - - let listen_addrs = info.listen_addrs + let listen_addrs = self.listen_addrs .into_iter() .map(|addr| addr.to_string().into_bytes()) .collect(); let mut message = structs_proto::Identify::new(); - message.set_agentVersion(info.agent_version); - message.set_protocolVersion(info.protocol_version); - message.set_publicKey(info.public_key); + message.set_agentVersion(self.agent_version); + message.set_protocolVersion(self.protocol_version); + message.set_publicKey(self.public_key); message.set_listenAddrs(listen_addrs); - message.set_observedAddr(info.observed_addr.to_string().into_bytes()); - message.set_protocols(RepeatedField::from_vec(info.protocols)); + message.set_observedAddr(remote_addr.to_string().into_bytes()); + message.set_protocols(RepeatedField::from_vec(self.protocols)); let bytes = message.write_to_bytes() .expect("writing protobuf failed ; should never happen"); @@ -189,7 +188,6 @@ mod tests { use self::libp2p_tcp_transport::TcpConfig; use self::tokio_core::reactor::Core; - use IdentifyInfo; use IdentifyProtocol; use futures::{IntoFuture, Future, Stream}; use libp2p_swarm::Transport; @@ -198,14 +196,13 @@ mod tests { fn basic() { let mut core = Core::new().unwrap(); let tcp = TcpConfig::new(core.handle()); - let with_proto = tcp.with_upgrade(IdentifyProtocol::new(IdentifyInfo { + let with_proto = tcp.with_upgrade(IdentifyProtocol { public_key: vec![1, 2, 3, 4], protocol_version: "ipfs/1.0.0".to_owned(), agent_version: "agent/version".to_owned(), listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()], - observed_addr: "/ip4/1.2.3.4/tcp/9876".parse().unwrap(), protocols: vec!["ping".to_owned(), "kad".to_owned()], - })); + }); let (server, addr) = with_proto.clone() .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) diff --git a/libp2p-ping/src/lib.rs b/libp2p-ping/src/lib.rs index 21c64ea7..55ece17e 100644 --- a/libp2p-ping/src/lib.rs +++ b/libp2p-ping/src/lib.rs @@ -90,6 +90,7 @@ use bytes::{Bytes, BytesMut, BufMut}; use futures::{Future, Sink, Stream}; use futures::future::{FutureResult, IntoFuture, loop_fn, Loop}; use futures::sync::{mpsc, oneshot}; +use libp2p_swarm::Multiaddr; use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint}; use parking_lot::Mutex; use rand::Rand; @@ -124,7 +125,7 @@ impl ConnectionUpgrade for Ping type Future = FutureResult; #[inline] - fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint) + fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future { // # How does it work? @@ -277,15 +278,19 @@ mod tests { let server = listener.incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (), - Endpoint::Listener)) + .and_then(|(c, _)| { + Ping.upgrade(c.unwrap().0, (), Endpoint::Listener, + &"/ip4/127.0.0.1/tcp/10000".parse().unwrap()) + }) .and_then(|(mut pinger, service)| { pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) }); let client = TcpStream::connect(&listener_addr, &core.handle()) .map_err(|e| e.into()) - .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer)) + .and_then(|c| { + Ping.upgrade(c, (), Endpoint::Dialer, &"/ip4/127.0.0.1/tcp/10000".parse().unwrap()) + }) .and_then(|(mut pinger, service)| { pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) }); @@ -304,13 +309,16 @@ mod tests { let server = listener.incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (), - Endpoint::Listener)) + .and_then(|(c, _)| { + Ping.upgrade(c.unwrap().0, (), Endpoint::Listener, + &"/ip4/127.0.0.1/tcp/10000".parse().unwrap()) + }) .and_then(|(_, service)| service.map_err(|_| panic!())); let client = TcpStream::connect(&listener_addr, &core.handle()) .map_err(|e| e.into()) - .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer)) + .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer, + &"/ip4/127.0.0.1/tcp/1000".parse().unwrap())) .and_then(|(mut pinger, service)| { let pings = (0 .. 20).map(move |_| { pinger.ping().map_err(|_| ()) diff --git a/libp2p-secio/src/lib.rs b/libp2p-secio/src/lib.rs index 95c0cbcc..1359d6d4 100644 --- a/libp2p-secio/src/lib.rs +++ b/libp2p-secio/src/lib.rs @@ -95,6 +95,7 @@ pub use self::error::SecioError; use bytes::{Bytes, BytesMut}; use futures::{Future, Poll, StartSend, Sink, Stream}; use futures::stream::MapErr as StreamMapErr; +use libp2p_swarm::Multiaddr; use ring::signature::RSAKeyPair; use rw_stream_sink::RwStreamSink; use std::error::Error; @@ -197,7 +198,7 @@ impl libp2p_swarm::ConnectionUpgrade for SecioConfig } #[inline] - fn upgrade(self, incoming: S, _: (), _: libp2p_swarm::Endpoint) -> Self::Future { + fn upgrade(self, incoming: S, _: (), _: libp2p_swarm::Endpoint, _: &Multiaddr) -> Self::Future { let fut = SecioMiddleware::handshake( incoming, self.key, diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index d88dec16..dd3a3440 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -297,7 +297,7 @@ where type Future = FromErr; #[inline] - fn upgrade(self, socket: C, _: (), _: Endpoint) -> Self::Future { + fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() } @@ -522,7 +522,8 @@ pub trait ConnectionUpgrade { /// /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), /// this function returns a future instead of the direct output. - fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future; + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint, + remote_addr: &Multiaddr) -> Self::Future; } /// Type of connection for the upgrade. @@ -552,7 +553,7 @@ impl ConnectionUpgrade for DeniedConnectionUpgrade } #[inline] - fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future { + fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future { unreachable!("the denied connection upgrade always fails to negotiate") } } @@ -597,13 +598,15 @@ where type Future = EitherConnUpgrFuture; #[inline] - fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future { + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint, + remote_addr: &Multiaddr) -> Self::Future + { match id { EitherUpgradeIdentifier::First(id) => { - EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty)) + EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty, remote_addr)) } EitherUpgradeIdentifier::Second(id) => { - EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty)) + EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty, remote_addr)) } } } @@ -709,7 +712,7 @@ where type NamesIter = iter::Once<(Bytes, ())>; #[inline] - fn upgrade(self, i: C, _: (), _: Endpoint) -> Self::Future { + fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { future::ok(i) } @@ -800,7 +803,7 @@ where ) -> Result + 'a>, (Self, Multiaddr)> { let upgrade = self.upgrade; - let dialed_fut = match self.transports.dial(addr) { + let dialed_fut = match self.transports.dial(addr.clone()) { Ok(f) => f.into_future(), Err((trans, addr)) => { let builder = UpgradedNode { @@ -821,8 +824,8 @@ where .map_err(|err| IoError::new(IoErrorKind::Other, err)); negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) }) - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) + .and_then(move |(upgrade_id, connection, upgrade)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr) }); Ok(Box::new(future)) @@ -850,7 +853,7 @@ where negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) }) .and_then(|(upgrade_id, connection, upgrade, addr)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr) .map(|u| (u, addr)) }); @@ -895,6 +898,7 @@ where let stream = listening_stream .map(move |(connection, client_addr)| { let upgrade = upgrade.clone(); + let remote_addr = client_addr.clone(); let connection = connection // Try to negotiate the protocol .and_then(move |connection| { @@ -902,8 +906,9 @@ where .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); multistream_select::listener_select_proto(connection, iter) .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .and_then(|(upgrade_id, connection)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + .and_then(move |(upgrade_id, connection)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, + &remote_addr) }) .into_future() }); diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs index 11d9eacf..ca17e29e 100644 --- a/multiplex-rs/src/lib.rs +++ b/multiplex-rs/src/lib.rs @@ -42,7 +42,7 @@ use futures::{Async, Future, Poll}; use futures::future::{self, FutureResult}; use header::MultiplexHeader; use swarm::muxing::StreamMuxer; -use swarm::{ConnectionUpgrade, Endpoint}; +use swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; use futures_mutex::Mutex; use read::{read_stream, MultiplexReadState}; use shared::{buf_from_slice, ByteBuf, MultiplexShared}; @@ -348,7 +348,7 @@ where type NamesIter = iter::Once<(Bytes, ())>; #[inline] - fn upgrade(self, i: C, _: (), end: Endpoint) -> Self::Future { + fn upgrade(self, i: C, _: (), end: Endpoint, _: &Multiaddr) -> Self::Future { future::ok(Multiplex::new(i, end)) }