From 3937649300eb02ade3b2b15da0a0dab35efec74b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 11 Dec 2017 15:57:21 +0100 Subject: [PATCH] Distinguish between initiator and receiver --- libp2p-ping/src/lib.rs | 18 +++++++++++------- libp2p-secio/src/lib.rs | 2 +- libp2p-swarm/src/lib.rs | 2 +- libp2p-swarm/src/transport.rs | 29 +++++++++++++++++++++-------- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/libp2p-ping/src/lib.rs b/libp2p-ping/src/lib.rs index ef7eb909..98c30f29 100644 --- a/libp2p-ping/src/lib.rs +++ b/libp2p-ping/src/lib.rs @@ -90,7 +90,7 @@ use bytes::{Bytes, BytesMut, BufMut}; use futures::{Future, Sink, Stream}; use futures::future::{FutureResult, IntoFuture, loop_fn, Loop}; use futures::sync::{mpsc, oneshot}; -use libp2p_swarm::transport::ConnectionUpgrade; +use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint}; use parking_lot::Mutex; use rand::Rand; use rand::os::OsRng; @@ -124,7 +124,9 @@ impl ConnectionUpgrade for Ping type Future = FutureResult; #[inline] - fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future { + fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint) + -> Self::Future + { // # How does it work? // // All the actual processing is performed by the *ponger*. @@ -263,7 +265,7 @@ mod tests { use futures::future::join_all; use futures::Future; use futures::Stream; - use libp2p_swarm::transport::ConnectionUpgrade; + use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint}; #[test] fn ping_pong() { @@ -275,14 +277,15 @@ mod tests { let server = listener.incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, ())) + .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (), + Endpoint::Listener)) .and_then(|(mut pinger, service)| { pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) }); let client = TcpStream::connect(&listener_addr, &core.handle()) .map_err(|e| e.into()) - .and_then(|c| Ping.upgrade(c, ())) + .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer)) .and_then(|(mut pinger, service)| { pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) }); @@ -301,12 +304,13 @@ mod tests { let server = listener.incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, ())) + .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (), + Endpoint::Listener)) .and_then(|(_, service)| service.map_err(|_| panic!())); let client = TcpStream::connect(&listener_addr, &core.handle()) .map_err(|e| e.into()) - .and_then(|c| Ping.upgrade(c, ())) + .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer)) .and_then(|(mut pinger, service)| { let pings = (0 .. 20).map(move |_| { pinger.ping().map_err(|_| ()) diff --git a/libp2p-secio/src/lib.rs b/libp2p-secio/src/lib.rs index e2b06e6c..7250dc90 100644 --- a/libp2p-secio/src/lib.rs +++ b/libp2p-secio/src/lib.rs @@ -150,7 +150,7 @@ impl libp2p_swarm::ConnectionUpgrade for SecioConfig } #[inline] - fn upgrade(self, incoming: S, _: ()) -> Self::Future { + fn upgrade(self, incoming: S, _: (), _: libp2p_swarm::Endpoint) -> Self::Future { let fut = SecioMiddleware::handshake( incoming, self.key, diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index a787f174..536b5e9e 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -166,4 +166,4 @@ pub use self::connection_reuse::ConnectionReuse; pub use self::multiaddr::Multiaddr; pub use self::muxing::StreamMuxer; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; -pub use self::transport::SimpleProtocol; +pub use self::transport::{Endpoint, SimpleProtocol}; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index c3e2e499..f98de830 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -215,7 +215,7 @@ impl ConnectionUpgrade for SimpleProtocol type Future = FromErr; #[inline] - fn upgrade(self, socket: C, _: ()) -> Self::Future { + fn upgrade(self, socket: C, _: (), _: Endpoint) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() } @@ -407,7 +407,17 @@ pub trait ConnectionUpgrade { /// /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), /// this function returns a future instead of the direct output. - fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future; + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) + -> Self::Future; +} + +/// Type of connection for the upgrade. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Endpoint { + /// The socket comes from a dialer. + Dialer, + /// The socket comes from a listener. + Listener, } /// See `or_upgrade()`. @@ -434,13 +444,15 @@ impl ConnectionUpgrade for OrUpgrade type Future = EitherConnUpgrFuture; #[inline] - fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future { + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) + -> Self::Future + { match id { EitherUpgradeIdentifier::First(id) => { - EitherConnUpgrFuture::First(self.0.upgrade(socket, id)) + EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty)) } EitherUpgradeIdentifier::Second(id) => { - EitherConnUpgrFuture::Second(self.1.upgrade(socket, id)) + EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty)) } } } @@ -543,7 +555,7 @@ impl ConnectionUpgrade for PlainTextConfig type NamesIter = iter::Once<(Bytes, ())>; #[inline] - fn upgrade(self, i: C, _: ()) -> Self::Future { + fn upgrade(self, i: C, _: (), _: Endpoint) -> Self::Future { future_ok(i) } @@ -616,7 +628,7 @@ impl<'a, T, C> UpgradedNode negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) }) .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id) + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) }); Ok(Box::new(future)) @@ -666,7 +678,8 @@ impl<'a, T, C> UpgradedNode .map_err(|err| IoError::new(IoErrorKind::Other, err)) }) .and_then(|(upgrade_id, connection, upgrade, client_addr)| { - upgrade.upgrade(connection, upgrade_id).map(|s| (s, client_addr)) + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + .map(|s| (s, client_addr)) }); Ok((Box::new(stream), new_addr))