Distinguish between initiator and receiver

This commit is contained in:
Pierre Krieger 2017-12-11 15:57:21 +01:00
parent 5cf86e5191
commit 3937649300
4 changed files with 34 additions and 17 deletions

View File

@ -90,7 +90,7 @@ use bytes::{Bytes, BytesMut, BufMut};
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use futures::future::{FutureResult, IntoFuture, loop_fn, Loop}; use futures::future::{FutureResult, IntoFuture, loop_fn, Loop};
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use libp2p_swarm::transport::ConnectionUpgrade; use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
use parking_lot::Mutex; use parking_lot::Mutex;
use rand::Rand; use rand::Rand;
use rand::os::OsRng; use rand::os::OsRng;
@ -124,7 +124,9 @@ impl<C> ConnectionUpgrade<C> for Ping
type Future = FutureResult<Self::Output, IoError>; type Future = FutureResult<Self::Output, IoError>;
#[inline] #[inline]
fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future { fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint)
-> Self::Future
{
// # How does it work? // # How does it work?
// //
// All the actual processing is performed by the *ponger*. // All the actual processing is performed by the *ponger*.
@ -263,7 +265,7 @@ mod tests {
use futures::future::join_all; use futures::future::join_all;
use futures::Future; use futures::Future;
use futures::Stream; use futures::Stream;
use libp2p_swarm::transport::ConnectionUpgrade; use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
#[test] #[test]
fn ping_pong() { fn ping_pong() {
@ -275,14 +277,15 @@ mod tests {
let server = listener.incoming() let server = listener.incoming()
.into_future() .into_future()
.map_err(|(e, _)| e.into()) .map_err(|(e, _)| e.into())
.and_then(|(c, _)| Ping.upgrade(c.unwrap().0, ())) .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (),
Endpoint::Listener))
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
}); });
let client = TcpStream::connect(&listener_addr, &core.handle()) let client = TcpStream::connect(&listener_addr, &core.handle())
.map_err(|e| e.into()) .map_err(|e| e.into())
.and_then(|c| Ping.upgrade(c, ())) .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer))
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
}); });
@ -301,12 +304,13 @@ mod tests {
let server = listener.incoming() let server = listener.incoming()
.into_future() .into_future()
.map_err(|(e, _)| e.into()) .map_err(|(e, _)| e.into())
.and_then(|(c, _)| Ping.upgrade(c.unwrap().0, ())) .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, (),
Endpoint::Listener))
.and_then(|(_, service)| service.map_err(|_| panic!())); .and_then(|(_, service)| service.map_err(|_| panic!()));
let client = TcpStream::connect(&listener_addr, &core.handle()) let client = TcpStream::connect(&listener_addr, &core.handle())
.map_err(|e| e.into()) .map_err(|e| e.into())
.and_then(|c| Ping.upgrade(c, ())) .and_then(|c| Ping.upgrade(c, (), Endpoint::Dialer))
.and_then(|(mut pinger, service)| { .and_then(|(mut pinger, service)| {
let pings = (0 .. 20).map(move |_| { let pings = (0 .. 20).map(move |_| {
pinger.ping().map_err(|_| ()) pinger.ping().map_err(|_| ())

View File

@ -150,7 +150,7 @@ impl<S> libp2p_swarm::ConnectionUpgrade<S> for SecioConfig
} }
#[inline] #[inline]
fn upgrade(self, incoming: S, _: ()) -> Self::Future { fn upgrade(self, incoming: S, _: (), _: libp2p_swarm::Endpoint) -> Self::Future {
let fut = SecioMiddleware::handshake( let fut = SecioMiddleware::handshake(
incoming, incoming,
self.key, self.key,

View File

@ -166,4 +166,4 @@ pub use self::connection_reuse::ConnectionReuse;
pub use self::multiaddr::Multiaddr; pub use self::multiaddr::Multiaddr;
pub use self::muxing::StreamMuxer; pub use self::muxing::StreamMuxer;
pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade};
pub use self::transport::SimpleProtocol; pub use self::transport::{Endpoint, SimpleProtocol};

View File

@ -215,7 +215,7 @@ impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
type Future = FromErr<O::Future, IoError>; type Future = FromErr<O::Future, IoError>;
#[inline] #[inline]
fn upgrade(self, socket: C, _: ()) -> Self::Future { fn upgrade(self, socket: C, _: (), _: Endpoint) -> Self::Future {
let upgrade = &self.upgrade; let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err() upgrade(socket).into_future().from_err()
} }
@ -407,7 +407,17 @@ pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
/// ///
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
/// this function returns a future instead of the direct output. /// this function returns a future instead of the direct output.
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future; fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint)
-> Self::Future;
}
/// Type of connection for the upgrade.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Endpoint {
/// The socket comes from a dialer.
Dialer,
/// The socket comes from a listener.
Listener,
} }
/// See `or_upgrade()`. /// See `or_upgrade()`.
@ -434,13 +444,15 @@ impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
type Future = EitherConnUpgrFuture<A::Future, B::Future>; type Future = EitherConnUpgrFuture<A::Future, B::Future>;
#[inline] #[inline]
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future { fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint)
-> Self::Future
{
match id { match id {
EitherUpgradeIdentifier::First(id) => { EitherUpgradeIdentifier::First(id) => {
EitherConnUpgrFuture::First(self.0.upgrade(socket, id)) EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty))
} }
EitherUpgradeIdentifier::Second(id) => { EitherUpgradeIdentifier::Second(id) => {
EitherConnUpgrFuture::Second(self.1.upgrade(socket, id)) EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty))
} }
} }
} }
@ -543,7 +555,7 @@ impl<C> ConnectionUpgrade<C> for PlainTextConfig
type NamesIter = iter::Once<(Bytes, ())>; type NamesIter = iter::Once<(Bytes, ())>;
#[inline] #[inline]
fn upgrade(self, i: C, _: ()) -> Self::Future { fn upgrade(self, i: C, _: (), _: Endpoint) -> Self::Future {
future_ok(i) future_ok(i)
} }
@ -616,7 +628,7 @@ impl<'a, T, C> UpgradedNode<T, C>
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade))
}) })
.and_then(|(upgrade_id, connection, upgrade)| { .and_then(|(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id) upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer)
}); });
Ok(Box::new(future)) Ok(Box::new(future))
@ -666,7 +678,8 @@ impl<'a, T, C> UpgradedNode<T, C>
.map_err(|err| IoError::new(IoErrorKind::Other, err)) .map_err(|err| IoError::new(IoErrorKind::Other, err))
}) })
.and_then(|(upgrade_id, connection, upgrade, client_addr)| { .and_then(|(upgrade_id, connection, upgrade, client_addr)| {
upgrade.upgrade(connection, upgrade_id).map(|s| (s, client_addr)) upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
.map(|s| (s, client_addr))
}); });
Ok((Box::new(stream), new_addr)) Ok((Box::new(stream), new_addr))