Merge pull request #95 from tomaka/identify-correct-report

Correctly report the address of the dialer in the identify protocol
This commit is contained in:
Pierre Krieger 2018-01-15 13:08:32 +01:00 committed by GitHub
commit 047e33023d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 51 deletions

View File

@ -49,17 +49,17 @@ mod structs_proto;
/// Prototype for an upgrade to the identity protocol. /// Prototype for an upgrade to the identity protocol.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct IdentifyProtocol { pub struct IdentifyProtocol {
pub information: IdentifyInfo, /// Our public key to report to the remote.
} pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
impl IdentifyProtocol { pub protocol_version: String,
/// Builds a new `IdentifyProtocol`. /// Name and version of the client. Can be thought as similar to the `User-Agent` header
#[inline] /// of HTTP.
pub fn new(information: IdentifyInfo) -> IdentifyProtocol { pub agent_version: String,
IdentifyProtocol { /// Addresses that we are listening on.
information pub listen_addrs: Vec<Multiaddr>,
} /// Protocols supported by us.
} pub protocols: Vec<String>,
} }
/// Information sent from the listener to the dialer. /// Information sent from the listener to the dialer.
@ -67,13 +67,14 @@ impl IdentifyProtocol {
pub struct IdentifyInfo { pub struct IdentifyInfo {
/// Public key of the node. /// Public key of the node.
pub public_key: Vec<u8>, pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0`. /// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
pub protocol_version: String, pub protocol_version: String,
/// Name and version. Can be thought as similar to the `User-Agent` header of HTTP. /// Name and version of the client. Can be thought as similar to the `User-Agent` header
/// of HTTP.
pub agent_version: String, pub agent_version: String,
/// Addresses that are listened on. /// Addresses that the remote is listening on.
pub listen_addrs: Vec<Multiaddr>, pub listen_addrs: Vec<Multiaddr>,
/// Address that the server uses to communicate with the dialer. /// Our own address as reported by the remote.
pub observed_addr: Multiaddr, pub observed_addr: Multiaddr,
/// Protocols supported by the remote. /// Protocols supported by the remote.
pub protocols: Vec<String>, pub protocols: Vec<String>,
@ -92,7 +93,7 @@ impl<C> ConnectionUpgrade<C> for IdentifyProtocol
iter::once((Bytes::from("/ipfs/id/1.0.0"), ())) iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
} }
fn upgrade(self, socket: C, _: (), ty: Endpoint) -> Self::Future { fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future {
// TODO: use jack's varint library instead // TODO: use jack's varint library instead
let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket); let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket);
@ -111,20 +112,18 @@ impl<C> ConnectionUpgrade<C> for IdentifyProtocol
} }
Endpoint::Listener => { Endpoint::Listener => {
let info = self.information; let listen_addrs = self.listen_addrs
let listen_addrs = info.listen_addrs
.into_iter() .into_iter()
.map(|addr| addr.to_string().into_bytes()) .map(|addr| addr.to_string().into_bytes())
.collect(); .collect();
let mut message = structs_proto::Identify::new(); let mut message = structs_proto::Identify::new();
message.set_agentVersion(info.agent_version); message.set_agentVersion(self.agent_version);
message.set_protocolVersion(info.protocol_version); message.set_protocolVersion(self.protocol_version);
message.set_publicKey(info.public_key); message.set_publicKey(self.public_key);
message.set_listenAddrs(listen_addrs); message.set_listenAddrs(listen_addrs);
message.set_observedAddr(info.observed_addr.to_string().into_bytes()); message.set_observedAddr(remote_addr.to_string().into_bytes());
message.set_protocols(RepeatedField::from_vec(info.protocols)); message.set_protocols(RepeatedField::from_vec(self.protocols));
let bytes = message.write_to_bytes() let bytes = message.write_to_bytes()
.expect("writing protobuf failed ; should never happen"); .expect("writing protobuf failed ; should never happen");
@ -189,7 +188,6 @@ mod tests {
use self::libp2p_tcp_transport::TcpConfig; use self::libp2p_tcp_transport::TcpConfig;
use self::tokio_core::reactor::Core; use self::tokio_core::reactor::Core;
use IdentifyInfo;
use IdentifyProtocol; use IdentifyProtocol;
use futures::{IntoFuture, Future, Stream}; use futures::{IntoFuture, Future, Stream};
use libp2p_swarm::Transport; use libp2p_swarm::Transport;
@ -198,14 +196,13 @@ mod tests {
fn basic() { fn basic() {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle()); let tcp = TcpConfig::new(core.handle());
let with_proto = tcp.with_upgrade(IdentifyProtocol::new(IdentifyInfo { let with_proto = tcp.with_upgrade(IdentifyProtocol {
public_key: vec![1, 2, 3, 4], public_key: vec![1, 2, 3, 4],
protocol_version: "ipfs/1.0.0".to_owned(), protocol_version: "ipfs/1.0.0".to_owned(),
agent_version: "agent/version".to_owned(), agent_version: "agent/version".to_owned(),
listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()], listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()],
observed_addr: "/ip4/1.2.3.4/tcp/9876".parse().unwrap(),
protocols: vec!["ping".to_owned(), "kad".to_owned()], protocols: vec!["ping".to_owned(), "kad".to_owned()],
})); });
let (server, addr) = with_proto.clone() let (server, addr) = with_proto.clone()
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())

View File

@ -90,6 +90,7 @@ use bytes::{Bytes, BytesMut, BufMut};
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use futures::future::{FutureResult, IntoFuture, loop_fn, Loop}; use futures::future::{FutureResult, IntoFuture, loop_fn, Loop};
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use libp2p_swarm::Multiaddr;
use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint}; use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
use parking_lot::Mutex; use parking_lot::Mutex;
use rand::Rand; use rand::Rand;
@ -124,7 +125,7 @@ impl<C> ConnectionUpgrade<C> for Ping
type Future = FutureResult<Self::Output, IoError>; type Future = FutureResult<Self::Output, IoError>;
#[inline] #[inline]
fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint) fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr)
-> Self::Future -> Self::Future
{ {
// # How does it work? // # How does it work?
@ -277,15 +278,19 @@ mod tests {
let server = listener.incoming() let server = listener.incoming()
.into_future() .into_future()
.map_err(|(e, _)| e.into()) .map_err(|(e, _)| e.into())
.and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (), .and_then(|(c, _)| {
Endpoint::Listener)) Ping.upgrade(c.unwrap().0, (), Endpoint::Listener,
&"/ip4/127.0.0.1/tcp/10000".parse().unwrap())
})
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
}); });
let client = TcpStream::connect(&listener_addr, &core.handle()) let client = TcpStream::connect(&listener_addr, &core.handle())
.map_err(|e| e.into()) .map_err(|e| e.into())
.and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer)) .and_then(|c| {
Ping.upgrade(c, (), Endpoint::Dialer, &"/ip4/127.0.0.1/tcp/10000".parse().unwrap())
})
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
}); });
@ -304,13 +309,16 @@ mod tests {
let server = listener.incoming() let server = listener.incoming()
.into_future() .into_future()
.map_err(|(e, _)| e.into()) .map_err(|(e, _)| e.into())
.and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (), .and_then(|(c, _)| {
Endpoint::Listener)) Ping.upgrade(c.unwrap().0, (), Endpoint::Listener,
&"/ip4/127.0.0.1/tcp/10000".parse().unwrap())
})
.and_then(|(_, service)| service.map_err(|_| panic!())); .and_then(|(_, service)| service.map_err(|_| panic!()));
let client = TcpStream::connect(&listener_addr, &core.handle()) let client = TcpStream::connect(&listener_addr, &core.handle())
.map_err(|e| e.into()) .map_err(|e| e.into())
.and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer)) .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer,
&"/ip4/127.0.0.1/tcp/1000".parse().unwrap()))
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
let pings = (0 .. 20).map(move |_| { let pings = (0 .. 20).map(move |_| {
pinger.ping().map_err(|_| ()) pinger.ping().map_err(|_| ())

View File

@ -95,6 +95,7 @@ pub use self::error::SecioError;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Future, Poll, StartSend, Sink, Stream}; use futures::{Future, Poll, StartSend, Sink, Stream};
use futures::stream::MapErr as StreamMapErr; use futures::stream::MapErr as StreamMapErr;
use libp2p_swarm::Multiaddr;
use ring::signature::RSAKeyPair; use ring::signature::RSAKeyPair;
use rw_stream_sink::RwStreamSink; use rw_stream_sink::RwStreamSink;
use std::error::Error; use std::error::Error;
@ -197,7 +198,7 @@ impl<S> libp2p_swarm::ConnectionUpgrade<S> for SecioConfig
} }
#[inline] #[inline]
fn upgrade(self, incoming: S, _: (), _: libp2p_swarm::Endpoint) -> Self::Future { fn upgrade(self, incoming: S, _: (), _: libp2p_swarm::Endpoint, _: &Multiaddr) -> Self::Future {
let fut = SecioMiddleware::handshake( let fut = SecioMiddleware::handshake(
incoming, incoming,
self.key, self.key,

View File

@ -297,7 +297,7 @@ where
type Future = FromErr<O::Future, IoError>; type Future = FromErr<O::Future, IoError>;
#[inline] #[inline]
fn upgrade(self, socket: C, _: (), _: Endpoint) -> Self::Future { fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
let upgrade = &self.upgrade; let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err() upgrade(socket).into_future().from_err()
} }
@ -522,7 +522,8 @@ pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
/// ///
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
/// this function returns a future instead of the direct output. /// this function returns a future instead of the direct output.
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future; fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint,
remote_addr: &Multiaddr) -> Self::Future;
} }
/// Type of connection for the upgrade. /// Type of connection for the upgrade.
@ -552,7 +553,7 @@ impl<C> ConnectionUpgrade<C> for DeniedConnectionUpgrade
} }
#[inline] #[inline]
fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future { fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future {
unreachable!("the denied connection upgrade always fails to negotiate") unreachable!("the denied connection upgrade always fails to negotiate")
} }
} }
@ -597,13 +598,15 @@ where
type Future = EitherConnUpgrFuture<A::Future, B::Future>; type Future = EitherConnUpgrFuture<A::Future, B::Future>;
#[inline] #[inline]
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future { fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint,
remote_addr: &Multiaddr) -> Self::Future
{
match id { match id {
EitherUpgradeIdentifier::First(id) => { EitherUpgradeIdentifier::First(id) => {
EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty)) EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty, remote_addr))
} }
EitherUpgradeIdentifier::Second(id) => { EitherUpgradeIdentifier::Second(id) => {
EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty)) EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty, remote_addr))
} }
} }
} }
@ -709,7 +712,7 @@ where
type NamesIter = iter::Once<(Bytes, ())>; type NamesIter = iter::Once<(Bytes, ())>;
#[inline] #[inline]
fn upgrade(self, i: C, _: (), _: Endpoint) -> Self::Future { fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(i) future::ok(i)
} }
@ -800,7 +803,7 @@ where
) -> Result<Box<Future<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)> { ) -> Result<Box<Future<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)> {
let upgrade = self.upgrade; let upgrade = self.upgrade;
let dialed_fut = match self.transports.dial(addr) { let dialed_fut = match self.transports.dial(addr.clone()) {
Ok(f) => f.into_future(), Ok(f) => f.into_future(),
Err((trans, addr)) => { Err((trans, addr)) => {
let builder = UpgradedNode { let builder = UpgradedNode {
@ -821,8 +824,8 @@ where
.map_err(|err| IoError::new(IoErrorKind::Other, err)); .map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade))
}) })
.and_then(|(upgrade_id, connection, upgrade)| { .and_then(move |(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr)
}); });
Ok(Box::new(future)) Ok(Box::new(future))
@ -850,7 +853,7 @@ where
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
}) })
.and_then(|(upgrade_id, connection, upgrade, addr)| { .and_then(|(upgrade_id, connection, upgrade, addr)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr)
.map(|u| (u, addr)) .map(|u| (u, addr))
}); });
@ -895,6 +898,7 @@ where
let stream = listening_stream let stream = listening_stream
.map(move |(connection, client_addr)| { .map(move |(connection, client_addr)| {
let upgrade = upgrade.clone(); let upgrade = upgrade.clone();
let remote_addr = client_addr.clone();
let connection = connection let connection = connection
// Try to negotiate the protocol // Try to negotiate the protocol
.and_then(move |connection| { .and_then(move |connection| {
@ -902,8 +906,9 @@ where
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t)); .map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
multistream_select::listener_select_proto(connection, iter) multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err)) .map_err(|err| IoError::new(IoErrorKind::Other, err))
.and_then(|(upgrade_id, connection)| { .and_then(move |(upgrade_id, connection)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) upgrade.upgrade(connection, upgrade_id, Endpoint::Listener,
&remote_addr)
}) })
.into_future() .into_future()
}); });

View File

@ -42,7 +42,7 @@ use futures::{Async, Future, Poll};
use futures::future::{self, FutureResult}; use futures::future::{self, FutureResult};
use header::MultiplexHeader; use header::MultiplexHeader;
use swarm::muxing::StreamMuxer; use swarm::muxing::StreamMuxer;
use swarm::{ConnectionUpgrade, Endpoint}; use swarm::{ConnectionUpgrade, Endpoint, Multiaddr};
use futures_mutex::Mutex; use futures_mutex::Mutex;
use read::{read_stream, MultiplexReadState}; use read::{read_stream, MultiplexReadState};
use shared::{buf_from_slice, ByteBuf, MultiplexShared}; use shared::{buf_from_slice, ByteBuf, MultiplexShared};
@ -348,7 +348,7 @@ where
type NamesIter = iter::Once<(Bytes, ())>; type NamesIter = iter::Once<(Bytes, ())>;
#[inline] #[inline]
fn upgrade(self, i: C, _: (), end: Endpoint) -> Self::Future { fn upgrade(self, i: C, _: (), end: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(Multiplex::new(i, end)) future::ok(Multiplex::new(i, end))
} }