mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-23 06:41:34 +00:00
Rework the transport upgrade API. (#1240)
* Rework the transport upgrade API. ALthough transport upgrades must follow a specific pattern in order fot the resulting transport to be usable with a `Network` or `Swarm`, that pattern is currently not well reflected in the transport upgrade API. Rather, transport upgrades are rather laborious and involve non-trivial code duplication. This commit introduces a `transport::upgrade::Builder` that is obtained from `Transport::upgrade`. The `Builder` encodes the previously implicit rules for transport upgrades: 1. Authentication upgrades must happen first. 2. Any number of upgrades may follow. 3. A multiplexer upgrade must happen last. Since multiplexing is the last (regular) transport upgrade (because that upgrade yields a `StreamMuxer` which is no longer a `AsyncRead` / `AsyncWrite` resource, which the upgrade process is based on), the upgrade starts with `Transport::upgrade` and ends with `Builder::multiplex`, which drops back down to the `Transport`, providing a fluent API. Authentication and multiplexer upgrades must furthermore adhere to a minimal contract w.r.t their outputs: 1. An authentication upgrade is given an (async) I/O resource `C` and must produce a pair `(I, D)` where `I: ConnectionInfo` and `D` is a new (async) I/O resource `D`. 2. A multiplexer upgrade is given an (async) I/O resource `C` and must produce a `M: StreamMuxer`. To that end, two changes to the `secio` and `noise` protocols have been made: 1. The `secio` upgrade now outputs a pair of `(PeerId, SecioOutput)`. The former implements `ConnectionInfo` and the latter `AsyncRead` / `AsyncWrite`, fulfilling the `Builder` contract. 2. A new `NoiseAuthenticated` upgrade has been added that wraps around any noise upgrade (i.e. `NoiseConfig`) and has an output of `(PeerId, NoiseOutput)`, i.e. it checks if the `RemoteIdentity` from the handshake output is an `IdentityKey`, failing if that is not the case. This is the standard upgrade procedure one wants for integrating noise with libp2p-core/swarm. * Cleanup * Add a new integration test. * Add missing license.
This commit is contained in:
@ -60,6 +60,7 @@ pub use identity::PublicKey;
|
||||
pub use transport::Transport;
|
||||
pub use translation::address_translation;
|
||||
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
|
||||
pub use nodes::ConnectionInfo;
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum Endpoint {
|
||||
|
@ -782,9 +782,8 @@ where
|
||||
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
|
||||
TTrans::Error: Send + 'static,
|
||||
TTrans::Dial: Send + 'static,
|
||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||
TMuxer: Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TInEvent: Send + 'static,
|
||||
TOutEvent: Send + 'static,
|
||||
TConnInfo: Send + 'static,
|
||||
@ -937,12 +936,10 @@ where
|
||||
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
|
||||
TTrans::Dial: Send + 'static,
|
||||
TTrans::Error: Send + 'static,
|
||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||
TMuxer: Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TInEvent: Send + 'static,
|
||||
TOutEvent: Send + 'static,
|
||||
TConnInfo: Send + 'static,
|
||||
TPeerId: Send + 'static,
|
||||
{
|
||||
let reach_id = match self.transport().clone().dial(first.clone()) {
|
||||
@ -985,14 +982,12 @@ where
|
||||
TTrans::Error: Send + 'static,
|
||||
TTrans::Dial: Send + 'static,
|
||||
TTrans::ListenerUpgrade: Send + 'static,
|
||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||
TMuxer: Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TInEvent: Send + 'static,
|
||||
TOutEvent: Send + 'static,
|
||||
THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static,
|
||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||
THandlerErr: error::Error + Send + 'static,
|
||||
TConnInfo: Clone,
|
||||
TPeerId: AsRef<[u8]> + Send + 'static,
|
||||
@ -1151,7 +1146,6 @@ where
|
||||
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
|
||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TInEvent: Send + 'static,
|
||||
TOutEvent: Send + 'static,
|
||||
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
|
||||
|
@ -25,12 +25,11 @@
|
||||
//! any desired protocols. The rest of the module defines combinators for
|
||||
//! modifying a transport through composition with other transports or protocol upgrades.
|
||||
|
||||
use crate::{InboundUpgrade, OutboundUpgrade, ConnectedPoint};
|
||||
use crate::ConnectedPoint;
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::{error, fmt};
|
||||
use std::{error::Error, fmt};
|
||||
use std::time::Duration;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub mod and_then;
|
||||
pub mod boxed;
|
||||
@ -69,11 +68,7 @@ pub use self::upgrade::Upgrade;
|
||||
///
|
||||
/// Additional protocols can be layered on top of the connections established
|
||||
/// by a [`Transport`] through an upgrade mechanism that is initiated via
|
||||
/// [`with_upgrade`](Transport::with_upgrade) and optionally followed by further upgrades
|
||||
/// through chaining calls to [`with_upgrade`](Transport::with_upgrade) and
|
||||
/// [`and_then`](Transport::and_then). Thereby every upgrade yields a new [`Transport`]
|
||||
/// whose connection setup incorporates all earlier upgrades followed by the new upgrade,
|
||||
/// i.e. the order of the upgrades is significant.
|
||||
/// [`upgrade`](Transport::upgrade).
|
||||
///
|
||||
/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
|
||||
/// > words, listening or dialing consumes the transport object. This has been designed
|
||||
@ -88,7 +83,7 @@ pub trait Transport {
|
||||
type Output;
|
||||
|
||||
/// An error that occurred during connection setup.
|
||||
type Error: error::Error;
|
||||
type Error: Error;
|
||||
|
||||
/// A stream of [`Output`](Transport::Output)s for inbound connections.
|
||||
///
|
||||
@ -127,7 +122,7 @@ pub trait Transport {
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Turns this `Transport` into an abstract boxed transport.
|
||||
/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
|
||||
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
|
||||
where Self: Sized + Clone + Send + Sync + 'static,
|
||||
Self::Dial: Send + 'static,
|
||||
@ -138,93 +133,89 @@ pub trait Transport {
|
||||
}
|
||||
|
||||
/// Applies a function on the connections created by the transport.
|
||||
fn map<F, O>(self, map: F) -> map::Map<Self, F>
|
||||
fn map<F, O>(self, f: F) -> map::Map<Self, F>
|
||||
where
|
||||
Self: Sized,
|
||||
F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone
|
||||
{
|
||||
map::Map::new(self, map)
|
||||
map::Map::new(self, f)
|
||||
}
|
||||
|
||||
/// Applies a function on the errors generated by the futures of the transport.
|
||||
fn map_err<F, TNewErr>(self, map_err: F) -> map_err::MapErr<Self, F>
|
||||
fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
|
||||
where
|
||||
Self: Sized,
|
||||
F: FnOnce(Self::Error) -> TNewErr + Clone
|
||||
F: FnOnce(Self::Error) -> E + Clone
|
||||
{
|
||||
map_err::MapErr::new(self, map_err)
|
||||
map_err::MapErr::new(self, f)
|
||||
}
|
||||
|
||||
/// Builds a new transport that falls back to another transport when
|
||||
/// encountering errors on dialing or listening for connections.
|
||||
/// Adds a fallback transport that is used when encountering errors
|
||||
/// while establishing inbound or outbound connections.
|
||||
///
|
||||
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
|
||||
/// return an error then `other` will be tried.
|
||||
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
|
||||
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
|
||||
where
|
||||
Self: Sized,
|
||||
U: Transport,
|
||||
<U as Transport>::Error: 'static
|
||||
{
|
||||
OrTransport::new(self, other)
|
||||
}
|
||||
|
||||
/// Wraps this transport inside an [`Upgrade`].
|
||||
///
|
||||
/// Whenever an inbound or outbound connection is established by this
|
||||
/// transport, the upgrade is applied on the current state of the
|
||||
/// connection (which may have already gone through previous upgrades)
|
||||
/// as an [`upgrade::InboundUpgrade`] or [`upgrade::OutboundUpgrade`],
|
||||
/// respectively.
|
||||
fn with_upgrade<U, O, E>(self, upgrade: U) -> Upgrade<Self, U>
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Output: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<Self::Output, Output = O, Error = E>,
|
||||
U: OutboundUpgrade<Self::Output, Output = O, Error = E>
|
||||
{
|
||||
Upgrade::new(self, upgrade)
|
||||
}
|
||||
|
||||
/// Applies a function producing an asynchronous result to every connection
|
||||
/// created by this transport.
|
||||
///
|
||||
/// This function can be used for ad-hoc protocol upgrades on a transport or
|
||||
/// for processing or adapting the output of an earlier upgrade before
|
||||
/// applying the next upgrade.
|
||||
fn and_then<C, F, O>(self, upgrade: C) -> and_then::AndThen<Self, C>
|
||||
/// This function can be used for ad-hoc protocol upgrades or
|
||||
/// for processing or adapting the output for following configurations.
|
||||
///
|
||||
/// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
|
||||
fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
|
||||
where
|
||||
Self: Sized,
|
||||
C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
|
||||
F: IntoFuture<Item = O>
|
||||
F: IntoFuture<Item = O>,
|
||||
<F as IntoFuture>::Error: Error + 'static
|
||||
{
|
||||
and_then::AndThen::new(self, upgrade)
|
||||
and_then::AndThen::new(self, f)
|
||||
}
|
||||
|
||||
/// Adds a timeout to the connection setup (including upgrades) for all inbound
|
||||
/// and outbound connection attempts.
|
||||
fn with_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
|
||||
/// Adds a timeout to the connection setup (including upgrades) for all
|
||||
/// inbound and outbound connections established through the transport.
|
||||
fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self: Sized
|
||||
{
|
||||
timeout::TransportTimeout::new(self, timeout)
|
||||
}
|
||||
|
||||
/// Adds a timeout to the connection setup (including upgrades) for all outbound
|
||||
/// connection attempts.
|
||||
fn with_outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
|
||||
/// connections established through the transport.
|
||||
fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self: Sized
|
||||
{
|
||||
timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
|
||||
}
|
||||
|
||||
/// Adds a timeout to the connection setup (including upgrades) for all inbound
|
||||
/// connection attempts.
|
||||
fn with_inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
|
||||
/// connections established through the transport.
|
||||
fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self: Sized
|
||||
{
|
||||
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
|
||||
}
|
||||
|
||||
/// Begins a series of protocol upgrades via an [`upgrade::Builder`].
|
||||
fn upgrade(self) -> upgrade::Builder<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Error: 'static
|
||||
{
|
||||
upgrade::Builder::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Event produced by [`Transport::Listener`]s.
|
||||
@ -362,10 +353,10 @@ where TErr: fmt::Display,
|
||||
}
|
||||
}
|
||||
|
||||
impl<TErr> error::Error for TransportError<TErr>
|
||||
where TErr: error::Error + 'static,
|
||||
impl<TErr> Error for TransportError<TErr>
|
||||
where TErr: Error + 'static,
|
||||
{
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
TransportError::MultiaddrNotSupported(_) => None,
|
||||
TransportError::Other(err) => Some(err),
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
|
||||
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
@ -18,9 +18,20 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Configuration of transport protocol upgrades.
|
||||
|
||||
use crate::{
|
||||
transport::{Transport, TransportError, ListenerEvent},
|
||||
ConnectedPoint,
|
||||
ConnectionInfo,
|
||||
transport::{
|
||||
Transport,
|
||||
TransportError,
|
||||
ListenerEvent,
|
||||
and_then::AndThen,
|
||||
},
|
||||
muxing::StreamMuxer,
|
||||
upgrade::{
|
||||
self,
|
||||
OutboundUpgrade,
|
||||
InboundUpgrade,
|
||||
apply_inbound,
|
||||
@ -30,11 +41,193 @@ use crate::{
|
||||
InboundUpgradeApply
|
||||
}
|
||||
};
|
||||
use futures::{future::Either, prelude::*, try_ready};
|
||||
use futures::{future, prelude::*, try_ready};
|
||||
use multiaddr::Multiaddr;
|
||||
use std::{error, fmt};
|
||||
use std::{error::Error, fmt};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// A `Builder` facilitates upgrading of a [`Transport`] for use with
|
||||
/// a [`Network`].
|
||||
///
|
||||
/// The upgrade process is defined by the following stages:
|
||||
///
|
||||
/// [`authenticate`](Builder::authenticate)`{1}`
|
||||
/// -> [`apply`](Builder::apply)`{*}`
|
||||
/// -> [`multiplex`](Builder::multiplex)`{1}`
|
||||
///
|
||||
/// It thus enforces the following invariants on every transport
|
||||
/// obtained from [`multiplex`](Builder::multiplex):
|
||||
///
|
||||
/// 1. The transport must be [authenticated](Builder::authenticate)
|
||||
/// and [multiplexed](Builder::multiplex).
|
||||
/// 2. Authentication must precede the negotiation of a multiplexer.
|
||||
/// 3. Applying a multiplexer is the last step in the upgrade process.
|
||||
/// 4. The [`Transport::Output`] conforms to the requirements of a [`Network`],
|
||||
/// namely a tuple of a [`ConnectionInfo`] (from the authentication upgrade) and a
|
||||
/// [`StreamMuxer`] (from the multiplexing upgrade).
|
||||
///
|
||||
/// [`Network`]: crate::nodes::Network
|
||||
pub struct Builder<T> {
|
||||
inner: T
|
||||
}
|
||||
|
||||
impl<T> Builder<T>
|
||||
where
|
||||
T: Transport,
|
||||
T::Error: 'static,
|
||||
{
|
||||
/// Creates a `Builder` over the given (base) `Transport`.
|
||||
pub fn new(transport: T) -> Builder<T> {
|
||||
Builder { inner: transport }
|
||||
}
|
||||
|
||||
/// Upgrades the transport to perform authentication of the remote.
|
||||
///
|
||||
/// The supplied upgrade receives the I/O resource `C` and must
|
||||
/// produce a pair `(I, D)`, where `I` is a [`ConnectionInfo`] and
|
||||
/// `D` is a new I/O resource. The upgrade must thus at a minimum
|
||||
/// identify the remote, which typically involves the use of a
|
||||
/// cryptographic authentication protocol in the context of establishing
|
||||
/// a secure channel.
|
||||
///
|
||||
/// ## Transitions
|
||||
///
|
||||
/// * I/O upgrade: `C -> (I, D)`.
|
||||
/// * Transport output: `C -> (I, D)`
|
||||
pub fn authenticate<C, D, U, I, E>(self, upgrade: U) -> Builder<
|
||||
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
|
||||
> where
|
||||
T: Transport<Output = C>,
|
||||
I: ConnectionInfo,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
D: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C, Output = (I, D), Error = E>,
|
||||
U: OutboundUpgrade<C, Output = (I, D), Error = E> + Clone,
|
||||
E: Error + 'static,
|
||||
{
|
||||
Builder::new(self.inner.and_then(move |conn, endpoint| {
|
||||
Authenticate {
|
||||
inner: upgrade::apply(conn, upgrade, endpoint)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
/// Applies an arbitrary upgrade on an authenticated, non-multiplexed
|
||||
/// transport.
|
||||
///
|
||||
/// The upgrade receives the I/O resource (i.e. connection) `C` and
|
||||
/// must produce a new I/O resource `D`. Any number of such upgrades
|
||||
/// can be performed.
|
||||
///
|
||||
/// ## Transitions
|
||||
///
|
||||
/// * I/O upgrade: `C -> D`.
|
||||
/// * Transport output: `(I, C) -> (I, D)`.
|
||||
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Builder<Upgrade<T, U>>
|
||||
where
|
||||
T: Transport<Output = (I, C)>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
D: AsyncRead + AsyncWrite,
|
||||
I: ConnectionInfo,
|
||||
U: InboundUpgrade<C, Output = D, Error = E>,
|
||||
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
|
||||
E: Error + 'static,
|
||||
{
|
||||
Builder::new(Upgrade::new(self.inner, upgrade))
|
||||
}
|
||||
|
||||
/// Upgrades the transport with a (sub)stream multiplexer.
|
||||
///
|
||||
/// The supplied upgrade receives the I/O resource `C` and must
|
||||
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
|
||||
/// This ends the (regular) transport upgrade process, yielding the underlying,
|
||||
/// configured transport.
|
||||
///
|
||||
/// ## Transitions
|
||||
///
|
||||
/// * I/O upgrade: `C -> M`.
|
||||
/// * Transport output: `(I, C) -> (I, M)`.
|
||||
pub fn multiplex<C, M, U, I, E>(self, upgrade: U)
|
||||
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
|
||||
where
|
||||
T: Transport<Output = (I, C)>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
M: StreamMuxer,
|
||||
I: ConnectionInfo,
|
||||
U: InboundUpgrade<C, Output = M, Error = E>,
|
||||
U: OutboundUpgrade<C, Output = M, Error = E> + Clone,
|
||||
E: Error + 'static,
|
||||
{
|
||||
self.inner.and_then(move |(i, c), endpoint| {
|
||||
let upgrade = upgrade::apply(c, upgrade, endpoint);
|
||||
Multiplex { info: Some(i), upgrade }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// An upgrade that authenticates the remote peer, typically
|
||||
/// in the context of negotiating a secure channel.
|
||||
///
|
||||
/// Configured through [`Builder::authenticate`].
|
||||
pub struct Authenticate<C, U>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C> + OutboundUpgrade<C>
|
||||
{
|
||||
inner: EitherUpgrade<C, U>
|
||||
}
|
||||
|
||||
impl<C, U> Future for Authenticate<C, U>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C> + OutboundUpgrade<C,
|
||||
Output = <U as InboundUpgrade<C>>::Output,
|
||||
Error = <U as InboundUpgrade<C>>::Error
|
||||
>
|
||||
{
|
||||
type Item = <EitherUpgrade<C, U> as Future>::Item;
|
||||
type Error = <EitherUpgrade<C, U> as Future>::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.inner.poll()
|
||||
}
|
||||
}
|
||||
|
||||
/// An upgrade that negotiates a (sub)stream multiplexer on
|
||||
/// top of an authenticated transport.
|
||||
///
|
||||
/// Configured through [`Builder::multiplex`].
|
||||
pub struct Multiplex<C, U, I>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C> + OutboundUpgrade<C>,
|
||||
{
|
||||
info: Option<I>,
|
||||
upgrade: EitherUpgrade<C, U>,
|
||||
}
|
||||
|
||||
impl<C, U, I, M, E> Future for Multiplex<C, U, I>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C, Output = M, Error = E>,
|
||||
U: OutboundUpgrade<C, Output = M, Error = E>
|
||||
{
|
||||
type Item = (I, M);
|
||||
type Error = UpgradeError<E>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let m = try_ready!(self.upgrade.poll());
|
||||
let i = self.info.take().expect("Multiplex future polled after completion.");
|
||||
Ok(Async::Ready((i, m)))
|
||||
}
|
||||
}
|
||||
|
||||
/// An inbound or outbound upgrade.
|
||||
type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
|
||||
|
||||
/// An upgrade on an authenticated, non-multiplexed [`Transport`].
|
||||
///
|
||||
/// See [`Builder::upgrade`](Builder::upgrade).
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Upgrade<T, U> { inner: T, upgrade: U }
|
||||
|
||||
@ -44,50 +237,53 @@ impl<T, U> Upgrade<T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<D, U, O, TUpgrErr> Transport for Upgrade<D, U>
|
||||
impl<T, C, D, U, I, E> Transport for Upgrade<T, U>
|
||||
where
|
||||
D: Transport,
|
||||
D::Output: AsyncRead + AsyncWrite,
|
||||
D::Error: 'static,
|
||||
U: InboundUpgrade<D::Output, Output = O, Error = TUpgrErr>,
|
||||
U: OutboundUpgrade<D::Output, Output = O, Error = TUpgrErr> + Clone,
|
||||
TUpgrErr: std::error::Error + Send + Sync + 'static // TODO: remove bounds
|
||||
T: Transport<Output = (I, C)>,
|
||||
T::Error: 'static,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C, Output = D, Error = E>,
|
||||
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
|
||||
E: Error + 'static
|
||||
{
|
||||
type Output = O;
|
||||
type Error = TransportUpgradeError<D::Error, TUpgrErr>;
|
||||
type Listener = ListenerStream<D::Listener, U>;
|
||||
type ListenerUpgrade = ListenerUpgradeFuture<D::ListenerUpgrade, U>;
|
||||
type Dial = DialUpgradeFuture<D::Dial, U>;
|
||||
type Output = (I, D);
|
||||
type Error = TransportUpgradeError<T::Error, E>;
|
||||
type Listener = ListenerStream<T::Listener, U>;
|
||||
type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, I, C>;
|
||||
type Dial = DialUpgradeFuture<T::Dial, U, I, C>;
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
let outbound = self.inner.dial(addr.clone())
|
||||
let future = self.inner.dial(addr.clone())
|
||||
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
|
||||
Ok(DialUpgradeFuture {
|
||||
future: outbound,
|
||||
upgrade: Either::A(Some(self.upgrade))
|
||||
future,
|
||||
upgrade: future::Either::A(Some(self.upgrade))
|
||||
})
|
||||
}
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
|
||||
let inbound = self.inner.listen_on(addr)
|
||||
let stream = self.inner.listen_on(addr)
|
||||
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
|
||||
Ok(ListenerStream { stream: inbound, upgrade: self.upgrade })
|
||||
Ok(ListenerStream {
|
||||
stream,
|
||||
upgrade: self.upgrade
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Error produced by a transport upgrade.
|
||||
/// Errors produced by a transport upgrade.
|
||||
#[derive(Debug)]
|
||||
pub enum TransportUpgradeError<TTransErr, TUpgrErr> {
|
||||
pub enum TransportUpgradeError<T, U> {
|
||||
/// Error in the transport.
|
||||
Transport(TTransErr),
|
||||
Transport(T),
|
||||
/// Error while upgrading to a protocol.
|
||||
Upgrade(UpgradeError<TUpgrErr>),
|
||||
Upgrade(UpgradeError<U>),
|
||||
}
|
||||
|
||||
impl<TTransErr, TUpgrErr> fmt::Display for TransportUpgradeError<TTransErr, TUpgrErr>
|
||||
impl<T, U> fmt::Display for TransportUpgradeError<T, U>
|
||||
where
|
||||
TTransErr: fmt::Display,
|
||||
TUpgrErr: fmt::Display,
|
||||
T: fmt::Display,
|
||||
U: fmt::Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@ -97,12 +293,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTransErr, TUpgrErr> error::Error for TransportUpgradeError<TTransErr, TUpgrErr>
|
||||
impl<T, U> Error for TransportUpgradeError<T, U>
|
||||
where
|
||||
TTransErr: error::Error + 'static,
|
||||
TUpgrErr: error::Error + 'static,
|
||||
T: Error + 'static,
|
||||
U: Error + 'static,
|
||||
{
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
TransportUpgradeError::Transport(e) => Some(e),
|
||||
TransportUpgradeError::Upgrade(e) => Some(e),
|
||||
@ -110,63 +306,67 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DialUpgradeFuture<T, U>
|
||||
/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
|
||||
pub struct DialUpgradeFuture<F, U, I, C>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: AsyncRead + AsyncWrite,
|
||||
U: OutboundUpgrade<T::Item>
|
||||
U: OutboundUpgrade<C>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
{
|
||||
future: T,
|
||||
upgrade: Either<Option<U>, OutboundUpgradeApply<T::Item, U>>
|
||||
future: F,
|
||||
upgrade: future::Either<Option<U>, (Option<I>, OutboundUpgradeApply<C, U>)>
|
||||
}
|
||||
|
||||
impl<T, U> Future for DialUpgradeFuture<T, U>
|
||||
impl<F, U, I, C, D> Future for DialUpgradeFuture<F, U, I, C>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: AsyncRead + AsyncWrite,
|
||||
U: OutboundUpgrade<T::Item>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static
|
||||
F: Future<Item = (I, C)>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: OutboundUpgrade<C, Output = D>,
|
||||
U::Error: Error
|
||||
{
|
||||
type Item = U::Output;
|
||||
type Error = TransportUpgradeError<T::Error, U::Error>;
|
||||
type Item = (I, D);
|
||||
type Error = TransportUpgradeError<F::Error, U::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
let next = match self.upgrade {
|
||||
Either::A(ref mut up) => {
|
||||
let x = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
|
||||
self.upgrade = match self.upgrade {
|
||||
future::Either::A(ref mut up) => {
|
||||
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
|
||||
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
|
||||
Either::B(apply_outbound(x, u))
|
||||
future::Either::B((Some(i), apply_outbound(c, u)))
|
||||
}
|
||||
future::Either::B((ref mut i, ref mut up)) => {
|
||||
let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade));
|
||||
let i = i.take().expect("DialUpgradeFuture polled after completion.");
|
||||
return Ok(Async::Ready((i, d)))
|
||||
}
|
||||
}
|
||||
Either::B(ref mut up) => return up.poll().map_err(TransportUpgradeError::Upgrade)
|
||||
};
|
||||
self.upgrade = next
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ListenerStream<T, U> {
|
||||
stream: T,
|
||||
/// The [`Transport::Listener`] stream of an [`Upgrade`]d transport.
|
||||
pub struct ListenerStream<S, U> {
|
||||
stream: S,
|
||||
upgrade: U
|
||||
}
|
||||
|
||||
impl<T, U, F> Stream for ListenerStream<T, U>
|
||||
impl<S, U, F, I, C, D> Stream for ListenerStream<S, U>
|
||||
where
|
||||
T: Stream<Item = ListenerEvent<F>>,
|
||||
F: Future,
|
||||
F::Item: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<F::Item> + Clone
|
||||
S: Stream<Item = ListenerEvent<F>>,
|
||||
F: Future<Item = (I, C)>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C, Output = D> + Clone
|
||||
{
|
||||
type Item = ListenerEvent<ListenerUpgradeFuture<F, U>>;
|
||||
type Error = TransportUpgradeError<T::Error, U::Error>;
|
||||
type Item = ListenerEvent<ListenerUpgradeFuture<F, U, I, C>>;
|
||||
type Error = TransportUpgradeError<S::Error, U::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match try_ready!(self.stream.poll().map_err(TransportUpgradeError::Transport)) {
|
||||
Some(event) => {
|
||||
let event = event.map(move |x| {
|
||||
let event = event.map(move |future| {
|
||||
ListenerUpgradeFuture {
|
||||
future: x,
|
||||
upgrade: Either::A(Some(self.upgrade.clone()))
|
||||
future,
|
||||
upgrade: future::Either::A(Some(self.upgrade.clone()))
|
||||
}
|
||||
});
|
||||
Ok(Async::Ready(Some(event)))
|
||||
@ -176,37 +376,40 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ListenerUpgradeFuture<T, U>
|
||||
/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport.
|
||||
pub struct ListenerUpgradeFuture<F, U, I, C>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<T::Item>
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C>
|
||||
{
|
||||
future: T,
|
||||
upgrade: Either<Option<U>, InboundUpgradeApply<T::Item, U>>
|
||||
future: F,
|
||||
upgrade: future::Either<Option<U>, (Option<I>, InboundUpgradeApply<C, U>)>
|
||||
}
|
||||
|
||||
impl<T, U> Future for ListenerUpgradeFuture<T, U>
|
||||
impl<F, U, I, C, D> Future for ListenerUpgradeFuture<F, U, I, C>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<T::Item>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static
|
||||
F: Future<Item = (I, C)>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
U: InboundUpgrade<C, Output = D>,
|
||||
U::Error: Error
|
||||
{
|
||||
type Item = U::Output;
|
||||
type Error = TransportUpgradeError<T::Error, U::Error>;
|
||||
type Item = (I, D);
|
||||
type Error = TransportUpgradeError<F::Error, U::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
let next = match self.upgrade {
|
||||
Either::A(ref mut up) => {
|
||||
let x = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
|
||||
self.upgrade = match self.upgrade {
|
||||
future::Either::A(ref mut up) => {
|
||||
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
|
||||
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::A(Some).");
|
||||
Either::B(apply_inbound(x, u))
|
||||
future::Either::B((Some(i), apply_inbound(c, u)))
|
||||
}
|
||||
future::Either::B((ref mut i, ref mut up)) => {
|
||||
let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade));
|
||||
let i = i.take().expect("ListenerUpgradeFuture polled after completion.");
|
||||
return Ok(Async::Ready((i, d)))
|
||||
}
|
||||
}
|
||||
Either::B(ref mut up) => return up.poll().map_err(TransportUpgradeError::Upgrade)
|
||||
};
|
||||
self.upgrade = next
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ use futures::{future, prelude::*};
|
||||
use libp2p_core::identity;
|
||||
use libp2p_core::multiaddr::multiaddr;
|
||||
use libp2p_core::nodes::network::{Network, NetworkEvent, NetworkReachError, PeerState, UnknownPeerDialErr, IncomingError};
|
||||
use libp2p_core::{PeerId, Transport, upgrade, upgrade::InboundUpgradeExt, upgrade::OutboundUpgradeExt};
|
||||
use libp2p_core::{PeerId, Transport, upgrade};
|
||||
use libp2p_swarm::{
|
||||
ProtocolsHandler,
|
||||
KeepAlive,
|
||||
@ -91,21 +91,13 @@ where
|
||||
fn deny_incoming_connec() {
|
||||
// Checks whether refusing an incoming connection on a swarm triggers the correct events.
|
||||
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _> = {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new());
|
||||
Network::new(transport, local_public_key.into())
|
||||
};
|
||||
|
||||
@ -113,15 +105,9 @@ fn deny_incoming_connec() {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new());
|
||||
Network::new(transport, local_public_key.into())
|
||||
};
|
||||
|
||||
@ -180,27 +166,18 @@ fn dial_self() {
|
||||
//
|
||||
// The last two items can happen in any order.
|
||||
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
let mut swarm = {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
})
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new())
|
||||
.and_then(|(peer, mplex), _| {
|
||||
// Gracefully close the connection to allow protocol
|
||||
// negotiation to complete.
|
||||
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
|
||||
});
|
||||
|
||||
Network::new(transport, local_public_key.into())
|
||||
};
|
||||
|
||||
@ -268,21 +245,13 @@ fn dial_self_by_id() {
|
||||
// Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
|
||||
// place.
|
||||
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _> = {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new());
|
||||
Network::new(transport, local_public_key.into())
|
||||
};
|
||||
|
||||
@ -294,21 +263,13 @@ fn dial_self_by_id() {
|
||||
fn multiple_addresses_err() {
|
||||
// Tries dialing multiple addresses, and makes sure there's one dialing error per addresses.
|
||||
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
let mut swarm = {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new());
|
||||
Network::new(transport, local_public_key.into())
|
||||
};
|
||||
|
||||
|
@ -21,16 +21,15 @@
|
||||
mod util;
|
||||
|
||||
use futures::{future, prelude::*};
|
||||
use libp2p_core::identity;
|
||||
use libp2p_core::{identity, upgrade, Transport};
|
||||
use libp2p_core::nodes::{Network, NetworkEvent, Peer};
|
||||
use libp2p_core::nodes::network::IncomingError;
|
||||
use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt, upgrade::InboundUpgradeExt};
|
||||
use libp2p_swarm::{
|
||||
ProtocolsHandler,
|
||||
KeepAlive,
|
||||
SubstreamProtocol,
|
||||
ProtocolsHandlerEvent,
|
||||
ProtocolsHandlerUpgrErr
|
||||
ProtocolsHandlerUpgrErr,
|
||||
};
|
||||
use std::{io, time::Duration};
|
||||
use wasm_timer::{Delay, Instant};
|
||||
@ -107,21 +106,13 @@ fn raw_swarm_simultaneous_connect() {
|
||||
// despite the fact that it adds a dependency.
|
||||
|
||||
for _ in 0 .. 10 {
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
let mut swarm1 = {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
})
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new())
|
||||
.and_then(|(peer, mplex), _| {
|
||||
// Gracefully close the connection to allow protocol
|
||||
// negotiation to complete.
|
||||
@ -134,15 +125,9 @@ fn raw_swarm_simultaneous_connect() {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
})
|
||||
.upgrade()
|
||||
.authenticate(libp2p_secio::SecioConfig::new(local_key))
|
||||
.multiplex(libp2p_mplex::MplexConfig::new())
|
||||
.and_then(|(peer, mplex), _| {
|
||||
// Gracefully close the connection to allow protocol
|
||||
// negotiation to complete.
|
||||
@ -313,3 +298,4 @@ fn raw_swarm_simultaneous_connect() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
128
core/tests/transport_upgrade.rs
Normal file
128
core/tests/transport_upgrade.rs
Normal file
@ -0,0 +1,128 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
mod util;
|
||||
|
||||
use futures::future::Future;
|
||||
use futures::stream::Stream;
|
||||
use libp2p_core::identity;
|
||||
use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent};
|
||||
use libp2p_core::upgrade::{UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
|
||||
use libp2p_mplex::MplexConfig;
|
||||
use libp2p_secio::SecioConfig;
|
||||
use multiaddr::Multiaddr;
|
||||
use rand::random;
|
||||
use std::io;
|
||||
use tokio_io::{io as nio, AsyncWrite, AsyncRead};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct HelloUpgrade {}
|
||||
|
||||
impl UpgradeInfo for HelloUpgrade {
|
||||
type Info = &'static str;
|
||||
type InfoIter = std::iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
std::iter::once("/hello/1")
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> InboundUpgrade<C> for HelloUpgrade
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Send + 'static
|
||||
{
|
||||
type Output = Negotiated<C>;
|
||||
type Error = io::Error;
|
||||
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
|
||||
|
||||
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
|
||||
Box::new(nio::read_exact(socket, [0u8; 5]).map(|(io, buf)| {
|
||||
assert_eq!(&buf[..], "hello".as_bytes());
|
||||
io
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> OutboundUpgrade<C> for HelloUpgrade
|
||||
where
|
||||
C: AsyncWrite + AsyncRead + Send + 'static,
|
||||
{
|
||||
type Output = Negotiated<C>;
|
||||
type Error = io::Error;
|
||||
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
|
||||
|
||||
fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
|
||||
Box::new(nio::write_all(socket, "hello").map(|(io, _)| io))
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upgrade_pipeline() {
|
||||
let listener_keys = identity::Keypair::generate_ed25519();
|
||||
let listener_id = listener_keys.public().into_peer_id();
|
||||
let listener_transport = MemoryTransport::default()
|
||||
.upgrade()
|
||||
.authenticate(SecioConfig::new(listener_keys))
|
||||
.apply(HelloUpgrade {})
|
||||
.apply(HelloUpgrade {})
|
||||
.apply(HelloUpgrade {})
|
||||
.multiplex(MplexConfig::default())
|
||||
.and_then(|(peer, mplex), _| {
|
||||
// Gracefully close the connection to allow protocol
|
||||
// negotiation to complete.
|
||||
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
|
||||
});
|
||||
|
||||
let dialer_keys = identity::Keypair::generate_ed25519();
|
||||
let dialer_id = dialer_keys.public().into_peer_id();
|
||||
let dialer_transport = MemoryTransport::default()
|
||||
.upgrade()
|
||||
.authenticate(SecioConfig::new(dialer_keys))
|
||||
.apply(HelloUpgrade {})
|
||||
.apply(HelloUpgrade {})
|
||||
.apply(HelloUpgrade {})
|
||||
.multiplex(MplexConfig::default())
|
||||
.and_then(|(peer, mplex), _| {
|
||||
// Gracefully close the connection to allow protocol
|
||||
// negotiation to complete.
|
||||
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
|
||||
});
|
||||
|
||||
let listen_addr: Multiaddr = format!("/memory/{}", random::<u64>()).parse().unwrap();
|
||||
let listener = listener_transport.listen_on(listen_addr.clone()).unwrap()
|
||||
.filter_map(ListenerEvent::into_upgrade)
|
||||
.for_each(move |(upgrade, _remote_addr)| {
|
||||
let dialer = dialer_id.clone();
|
||||
upgrade.map(move |(peer, _mplex)| {
|
||||
assert_eq!(peer, dialer)
|
||||
})
|
||||
})
|
||||
.map_err(|e| panic!("Listener error: {}", e));
|
||||
|
||||
let dialer = dialer_transport.dial(listen_addr).unwrap()
|
||||
.map(move |(peer, _mplex)| {
|
||||
assert_eq!(peer, listener_id)
|
||||
});
|
||||
|
||||
let mut rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.spawn(listener);
|
||||
rt.block_on(dialer).unwrap()
|
||||
}
|
||||
|
@ -18,7 +18,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use libp2p_core::{muxing, Transport, transport::ListenerEvent};
|
||||
use libp2p_core::{muxing, upgrade, Transport, transport::ListenerEvent};
|
||||
use libp2p_tcp::TcpConfig;
|
||||
use futures::prelude::*;
|
||||
use std::sync::{Arc, mpsc};
|
||||
@ -32,8 +32,9 @@ fn async_write() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let transport =
|
||||
TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
|
||||
let mplex = libp2p_mplex::MplexConfig::new();
|
||||
|
||||
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
|
||||
|
||||
let mut listener = transport
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
@ -67,7 +68,8 @@ fn async_write() {
|
||||
let _ = rt.block_on(future).unwrap();
|
||||
});
|
||||
|
||||
let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
|
||||
let mplex = libp2p_mplex::MplexConfig::new();
|
||||
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
|
||||
|
||||
let future = transport
|
||||
.dial(rx.recv().unwrap())
|
||||
|
@ -18,7 +18,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use libp2p_core::{muxing, Transport, transport::ListenerEvent};
|
||||
use libp2p_core::{muxing, upgrade, Transport, transport::ListenerEvent};
|
||||
use libp2p_tcp::TcpConfig;
|
||||
use futures::prelude::*;
|
||||
use std::sync::{Arc, mpsc};
|
||||
@ -35,8 +35,9 @@ fn client_to_server_outbound() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let transport =
|
||||
TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
|
||||
let mplex = libp2p_mplex::MplexConfig::new();
|
||||
|
||||
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
|
||||
|
||||
let mut listener = transport
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
@ -75,7 +76,8 @@ fn client_to_server_outbound() {
|
||||
let _ = rt.block_on(future).unwrap();
|
||||
});
|
||||
|
||||
let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
|
||||
let mplex = libp2p_mplex::MplexConfig::new();
|
||||
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
|
||||
|
||||
let future = transport
|
||||
.dial(rx.recv().unwrap())
|
||||
@ -98,8 +100,8 @@ fn client_to_server_inbound() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let bg_thread = thread::spawn(move || {
|
||||
let transport =
|
||||
TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
|
||||
let mplex = libp2p_mplex::MplexConfig::new();
|
||||
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
|
||||
|
||||
let mut listener = transport
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
@ -139,7 +141,8 @@ fn client_to_server_inbound() {
|
||||
let _ = rt.block_on(future).unwrap();
|
||||
});
|
||||
|
||||
let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
|
||||
let mplex = libp2p_mplex::MplexConfig::new();
|
||||
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
|
||||
|
||||
let future = transport
|
||||
.dial(rx.recv().unwrap())
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p_core::transport::{ListenerEvent, Transport};
|
||||
use libp2p_core::upgrade::Negotiated;
|
||||
use libp2p_core::upgrade::{self, Negotiated};
|
||||
use libp2p_deflate::{DeflateConfig, DeflateOutput};
|
||||
use libp2p_tcp::{TcpConfig, TcpTransStream};
|
||||
use log::info;
|
||||
@ -32,9 +32,9 @@ fn deflate() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
fn prop(message: Vec<u8>) -> bool {
|
||||
let server_transport = TcpConfig::new().with_upgrade(DeflateConfig {});
|
||||
let client_transport = TcpConfig::new().with_upgrade(DeflateConfig {});
|
||||
run(server_transport, client_transport, message);
|
||||
let client = TcpConfig::new().and_then(|c, e| upgrade::apply(c, DeflateConfig {}, e));
|
||||
let server = client.clone();
|
||||
run(server, client, message);
|
||||
true
|
||||
}
|
||||
|
||||
|
@ -259,10 +259,9 @@ mod tests {
|
||||
use libp2p_core::{
|
||||
identity,
|
||||
PeerId,
|
||||
upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt},
|
||||
muxing::StreamMuxer,
|
||||
Multiaddr,
|
||||
Transport
|
||||
Transport,
|
||||
};
|
||||
use libp2p_tcp::TcpConfig;
|
||||
use libp2p_secio::SecioConfig;
|
||||
@ -283,15 +282,9 @@ mod tests {
|
||||
let pubkey = id_keys.public();
|
||||
let transport = TcpConfig::new()
|
||||
.nodelay(true)
|
||||
.with_upgrade(SecioConfig::new(id_keys))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
.upgrade()
|
||||
.authenticate(SecioConfig::new(id_keys))
|
||||
.multiplex(MplexConfig::new());
|
||||
(pubkey, transport)
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,6 @@ use libp2p_core::{
|
||||
nodes::Substream,
|
||||
multiaddr::{Protocol, multiaddr},
|
||||
muxing::StreamMuxerBox,
|
||||
upgrade,
|
||||
};
|
||||
use libp2p_secio::SecioConfig;
|
||||
use libp2p_swarm::Swarm;
|
||||
@ -61,18 +60,13 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwa
|
||||
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
|
||||
|
||||
for _ in 0 .. num {
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let local_public_key = local_key.public();
|
||||
let transport = MemoryTransport::default()
|
||||
.with_upgrade(SecioConfig::new(local_key))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let yamux = yamux::Config::default();
|
||||
upgrade::apply(out.stream, yamux, endpoint)
|
||||
.map(|muxer| (peer_id, StreamMuxerBox::new(muxer)))
|
||||
})
|
||||
.upgrade()
|
||||
.authenticate(SecioConfig::new(local_key))
|
||||
.multiplex(yamux::Config::default())
|
||||
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
|
||||
.map_err(|e| panic!("Failed to create transport: {:?}", e))
|
||||
.boxed();
|
||||
|
||||
|
@ -31,6 +31,9 @@ pub enum NoiseError {
|
||||
Noise(SnowError),
|
||||
/// A public key is invalid.
|
||||
InvalidKey,
|
||||
/// Authentication in a [`NoiseAuthenticated`](crate::NoiseAuthenticated)
|
||||
/// upgrade failed.
|
||||
AuthenticationFailed,
|
||||
/// A handshake payload is invalid.
|
||||
InvalidPayload(protobuf::ProtobufError),
|
||||
/// A signature was required and could not be created.
|
||||
@ -46,6 +49,7 @@ impl fmt::Display for NoiseError {
|
||||
NoiseError::Noise(e) => write!(f, "{}", e),
|
||||
NoiseError::InvalidKey => f.write_str("invalid public key"),
|
||||
NoiseError::InvalidPayload(e) => write!(f, "{}", e),
|
||||
NoiseError::AuthenticationFailed => f.write_str("Authentication failed"),
|
||||
NoiseError::SigningError(e) => write!(f, "{}", e),
|
||||
NoiseError::__Nonexhaustive => f.write_str("__Nonexhaustive")
|
||||
}
|
||||
@ -58,6 +62,7 @@ impl Error for NoiseError {
|
||||
NoiseError::Io(e) => Some(e),
|
||||
NoiseError::Noise(_) => None, // TODO: `SnowError` should implement `Error`.
|
||||
NoiseError::InvalidKey => None,
|
||||
NoiseError::AuthenticationFailed => None,
|
||||
NoiseError::InvalidPayload(e) => Some(e),
|
||||
NoiseError::SigningError(e) => Some(e),
|
||||
NoiseError::__Nonexhaustive => None
|
||||
|
@ -43,9 +43,9 @@
|
||||
//! # fn main() {
|
||||
//! let id_keys = identity::Keypair::generate_ed25519();
|
||||
//! let dh_keys = Keypair::<X25519>::new().into_authentic(&id_keys).unwrap();
|
||||
//! let noise = NoiseConfig::xx(dh_keys);
|
||||
//! let transport = TcpConfig::new().with_upgrade(noise);
|
||||
//! // ...
|
||||
//! let noise = NoiseConfig::xx(dh_keys).into_authenticated();
|
||||
//! let builder = TcpConfig::new().upgrade().authenticate(noise);
|
||||
//! // let transport = builder.multiplex(...);
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
@ -61,7 +61,8 @@ pub use io::handshake::{Handshake, RemoteIdentity, IdentityExchange};
|
||||
pub use protocol::{Keypair, AuthenticKeypair, KeypairIdentity, PublicKey, SecretKey};
|
||||
pub use protocol::{Protocol, ProtocolParams, x25519::X25519, IX, IK, XX};
|
||||
|
||||
use libp2p_core::{identity, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::Negotiated};
|
||||
use futures::{future::{self, FutureResult}, Future};
|
||||
use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use zeroize::Zeroize;
|
||||
|
||||
@ -74,6 +75,14 @@ pub struct NoiseConfig<P, C: Zeroize, R = ()> {
|
||||
_marker: std::marker::PhantomData<P>
|
||||
}
|
||||
|
||||
impl<H, C: Zeroize, R> NoiseConfig<H, C, R> {
|
||||
/// Turn the `NoiseConfig` into an authenticated upgrade for use
|
||||
/// with a [`Network`](libp2p_core::nodes::Network).
|
||||
pub fn into_authenticated(self) -> NoiseAuthenticated<H, C, R> {
|
||||
NoiseAuthenticated { config: self }
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> NoiseConfig<IX, C>
|
||||
where
|
||||
C: Protocol<C> + Zeroize
|
||||
@ -277,3 +286,84 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Authenticated Upgrades /////////////////////////////////////////////////////
|
||||
|
||||
/// A `NoiseAuthenticated` transport upgrade that wraps around any
|
||||
/// `NoiseConfig` handshake and verifies that the remote identified with a
|
||||
/// [`RemoteIdentity::IdentityKey`], aborting otherwise.
|
||||
///
|
||||
/// See [`NoiseConfig::into_authenticated`].
|
||||
///
|
||||
/// On success, the upgrade yields the [`PeerId`] obtained from the
|
||||
/// `RemoteIdentity`. The output of this upgrade is thus directly suitable
|
||||
/// for creating an [`authenticated`](libp2p_core::TransportBuilder::authenticate)
|
||||
/// transport for use with a [`Network`](libp2p_core::nodes::Network).
|
||||
#[derive(Clone)]
|
||||
pub struct NoiseAuthenticated<P, C: Zeroize, R> {
|
||||
config: NoiseConfig<P, C, R>
|
||||
}
|
||||
|
||||
impl<P, C: Zeroize, R> UpgradeInfo for NoiseAuthenticated<P, C, R>
|
||||
where
|
||||
NoiseConfig<P, C, R>: UpgradeInfo
|
||||
{
|
||||
type Info = <NoiseConfig<P, C, R> as UpgradeInfo>::Info;
|
||||
type InfoIter = <NoiseConfig<P, C, R> as UpgradeInfo>::InfoIter;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
self.config.protocol_info()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P, C, R> InboundUpgrade<T> for NoiseAuthenticated<P, C, R>
|
||||
where
|
||||
NoiseConfig<P, C, R>: UpgradeInfo + InboundUpgrade<T,
|
||||
Output = (RemoteIdentity<C>, NoiseOutput<Negotiated<T>>),
|
||||
Error = NoiseError
|
||||
>,
|
||||
T: AsyncRead + AsyncWrite + Send + 'static,
|
||||
C: Protocol<C> + AsRef<[u8]> + Zeroize + Send + 'static,
|
||||
{
|
||||
type Output = (PeerId, NoiseOutput<Negotiated<T>>);
|
||||
type Error = NoiseError;
|
||||
type Future = future::AndThen<
|
||||
<NoiseConfig<P, C, R> as InboundUpgrade<T>>::Future,
|
||||
FutureResult<Self::Output, Self::Error>,
|
||||
fn((RemoteIdentity<C>, NoiseOutput<Negotiated<T>>)) -> FutureResult<Self::Output, Self::Error>
|
||||
>;
|
||||
|
||||
fn upgrade_inbound(self, socket: Negotiated<T>, info: Self::Info) -> Self::Future {
|
||||
self.config.upgrade_inbound(socket, info)
|
||||
.and_then(|(remote, io)| future::result(match remote {
|
||||
RemoteIdentity::IdentityKey(pk) => Ok((pk.into_peer_id(), io)),
|
||||
_ => Err(NoiseError::AuthenticationFailed)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P, C, R> OutboundUpgrade<T> for NoiseAuthenticated<P, C, R>
|
||||
where
|
||||
NoiseConfig<P, C, R>: UpgradeInfo + OutboundUpgrade<T,
|
||||
Output = (RemoteIdentity<C>, NoiseOutput<Negotiated<T>>),
|
||||
Error = NoiseError
|
||||
>,
|
||||
T: AsyncRead + AsyncWrite + Send + 'static,
|
||||
C: Protocol<C> + AsRef<[u8]> + Zeroize + Send + 'static,
|
||||
{
|
||||
type Output = (PeerId, NoiseOutput<Negotiated<T>>);
|
||||
type Error = NoiseError;
|
||||
type Future = future::AndThen<
|
||||
<NoiseConfig<P, C, R> as OutboundUpgrade<T>>::Future,
|
||||
FutureResult<Self::Output, Self::Error>,
|
||||
fn((RemoteIdentity<C>, NoiseOutput<Negotiated<T>>)) -> FutureResult<Self::Output, Self::Error>
|
||||
>;
|
||||
|
||||
fn upgrade_outbound(self, socket: Negotiated<T>, info: Self::Info) -> Self::Future {
|
||||
self.config.upgrade_outbound(socket, info)
|
||||
.and_then(|(remote, io)| future::result(match remote {
|
||||
RemoteIdentity::IdentityKey(pk) => Ok((pk.into_peer_id(), io)),
|
||||
_ => Err(NoiseError::AuthenticationFailed)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
use futures::{future::{self, Either}, prelude::*};
|
||||
use libp2p_core::identity;
|
||||
use libp2p_core::upgrade::{Negotiated, apply_inbound, apply_outbound};
|
||||
use libp2p_core::upgrade::{self, Negotiated, apply_inbound, apply_outbound};
|
||||
use libp2p_core::transport::{Transport, ListenerEvent};
|
||||
use libp2p_noise::{Keypair, X25519, NoiseConfig, RemoteIdentity, NoiseError, NoiseOutput};
|
||||
use libp2p_tcp::{TcpConfig, TcpTransStream};
|
||||
@ -28,6 +28,16 @@ use log::info;
|
||||
use quickcheck::QuickCheck;
|
||||
use tokio::{self, io};
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn core_upgrade_compat() {
|
||||
// Tests API compaibility with the libp2p-core upgrade API,
|
||||
// i.e. if it compiles, the "test" is considered a success.
|
||||
let id_keys = identity::Keypair::generate_ed25519();
|
||||
let dh_keys = Keypair::<X25519>::new().into_authentic(&id_keys).unwrap();
|
||||
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
|
||||
let _ = TcpConfig::new().upgrade().authenticate(noise);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn xx() {
|
||||
let _ = env_logger::try_init();
|
||||
@ -40,12 +50,16 @@ fn xx() {
|
||||
|
||||
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
|
||||
let server_transport = TcpConfig::new()
|
||||
.with_upgrade(NoiseConfig::xx(server_dh))
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint)
|
||||
})
|
||||
.and_then(move |out, _| expect_identity(out, &client_id_public));
|
||||
|
||||
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
|
||||
let client_transport = TcpConfig::new()
|
||||
.with_upgrade(NoiseConfig::xx(client_dh))
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint)
|
||||
})
|
||||
.and_then(move |out, _| expect_identity(out, &server_id_public));
|
||||
|
||||
run(server_transport, client_transport, message);
|
||||
@ -66,12 +80,16 @@ fn ix() {
|
||||
|
||||
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
|
||||
let server_transport = TcpConfig::new()
|
||||
.with_upgrade(NoiseConfig::ix(server_dh))
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint)
|
||||
})
|
||||
.and_then(move |out, _| expect_identity(out, &client_id_public));
|
||||
|
||||
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
|
||||
let client_transport = TcpConfig::new()
|
||||
.with_upgrade(NoiseConfig::ix(client_dh))
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint)
|
||||
})
|
||||
.and_then(move |out, _| expect_identity(out, &server_id_public));
|
||||
|
||||
run(server_transport, client_transport, message);
|
||||
|
@ -23,16 +23,18 @@
|
||||
use libp2p_core::{
|
||||
Multiaddr,
|
||||
PeerId,
|
||||
Negotiated,
|
||||
identity,
|
||||
muxing::StreamMuxer,
|
||||
upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt},
|
||||
transport::Transport
|
||||
transport::{Transport, boxed::Boxed},
|
||||
either::EitherError,
|
||||
upgrade::UpgradeError
|
||||
};
|
||||
use libp2p_ping::*;
|
||||
use libp2p_yamux as yamux;
|
||||
use libp2p_secio::SecioConfig;
|
||||
use libp2p_yamux::{self as yamux, Yamux};
|
||||
use libp2p_secio::{SecioConfig, SecioOutput, SecioError};
|
||||
use libp2p_swarm::Swarm;
|
||||
use libp2p_tcp::TcpConfig;
|
||||
use libp2p_tcp::{TcpConfig, TcpTransStream};
|
||||
use futures::{future, prelude::*};
|
||||
use std::{fmt, io, time::Duration, sync::mpsc::sync_channel};
|
||||
use tokio::runtime::Runtime;
|
||||
@ -101,26 +103,21 @@ fn ping() {
|
||||
assert!(rtt < Duration::from_millis(50));
|
||||
}
|
||||
|
||||
fn mk_transport() -> (PeerId, impl Transport<
|
||||
Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send, Error = impl Into<io::Error>>),
|
||||
Listener = impl Send,
|
||||
ListenerUpgrade = impl Send,
|
||||
Dial = impl Send,
|
||||
Error = impl fmt::Debug
|
||||
> + Clone) {
|
||||
fn mk_transport() -> (
|
||||
PeerId,
|
||||
Boxed<
|
||||
(PeerId, Yamux<Negotiated<SecioOutput<Negotiated<TcpTransStream>>>>),
|
||||
EitherError<EitherError<io::Error, UpgradeError<SecioError>>, UpgradeError<io::Error>>
|
||||
>
|
||||
) {
|
||||
let id_keys = identity::Keypair::generate_ed25519();
|
||||
let peer_id = id_keys.public().into_peer_id();
|
||||
let transport = TcpConfig::new()
|
||||
.nodelay(true)
|
||||
.with_upgrade(SecioConfig::new(id_keys))
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = yamux::Config::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
.upgrade()
|
||||
.authenticate(SecioConfig::new(id_keys))
|
||||
.multiplex(yamux::Config::default())
|
||||
.boxed();
|
||||
(peer_id, transport)
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@ aes-all = ["aesni"]
|
||||
[dev-dependencies]
|
||||
criterion = "0.2"
|
||||
libp2p-tcp = { version = "0.12.0", path = "../../transports/tcp" }
|
||||
libp2p-mplex = { version = "0.12.0", path = "../../muxers/mplex" }
|
||||
tokio = "0.1"
|
||||
tokio-tcp = "0.1"
|
||||
|
||||
|
@ -21,65 +21,54 @@
|
||||
//! The `secio` protocol is a middleware that will encrypt and decrypt communications going
|
||||
//! through a socket (or anything that implements `AsyncRead + AsyncWrite`).
|
||||
//!
|
||||
//! # Connection upgrade
|
||||
//! # Usage
|
||||
//!
|
||||
//! The `SecioConfig` struct implements the `ConnectionUpgrade` trait. You can apply it over a
|
||||
//! `Transport` by using the `with_upgrade` method. The returned object will also implement
|
||||
//! `Transport` and will automatically apply the secio protocol over any connection that is opened
|
||||
//! through it.
|
||||
//! The `SecioConfig` implements [`InboundUpgrade`] and [`OutboundUpgrade`] and thus
|
||||
//! serves as a connection upgrade for authentication of a transport.
|
||||
//! See [`authenticate`](libp2p_core::transport::upgrade::builder::Builder::authenticate).
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # fn main() {
|
||||
//! use futures::Future;
|
||||
//! use libp2p_secio::{SecioConfig, SecioOutput};
|
||||
//! use libp2p_core::{Multiaddr, identity, upgrade::apply_inbound};
|
||||
//! use libp2p_core::{PeerId, Multiaddr, identity};
|
||||
//! use libp2p_core::transport::Transport;
|
||||
//! use libp2p_mplex::MplexConfig;
|
||||
//! use libp2p_tcp::TcpConfig;
|
||||
//! use tokio_io::io::write_all;
|
||||
//! use tokio::runtime::current_thread::Runtime;
|
||||
//!
|
||||
//! let dialer = TcpConfig::new()
|
||||
//! .with_upgrade({
|
||||
//! # let private_key = &mut [];
|
||||
//! // See the documentation of `identity::Keypair`.
|
||||
//! let keypair = identity::Keypair::rsa_from_pkcs8(private_key).unwrap();
|
||||
//! SecioConfig::new(keypair)
|
||||
//! })
|
||||
//! .map(|out: SecioOutput<_>, _| out.stream);
|
||||
//! // Create a local peer identity.
|
||||
//! let local_keys = identity::Keypair::generate_ed25519();
|
||||
//!
|
||||
//! let future = dialer.dial("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap())
|
||||
//! .unwrap()
|
||||
//! .map_err(|e| panic!("error: {:?}", e))
|
||||
//! .and_then(|connection| {
|
||||
//! // Sends "hello world" on the connection, will be encrypted.
|
||||
//! write_all(connection, "hello world")
|
||||
//! })
|
||||
//! .map_err(|e| panic!("error: {:?}", e));
|
||||
//! // Create a `Transport`.
|
||||
//! let transport = TcpConfig::new()
|
||||
//! .upgrade()
|
||||
//! .authenticate(SecioConfig::new(local_keys.clone()))
|
||||
//! .multiplex(MplexConfig::default());
|
||||
//!
|
||||
//! let mut rt = Runtime::new().unwrap();
|
||||
//! let _ = rt.block_on(future).unwrap();
|
||||
//! // The transport can be used with a `Network` from `libp2p-core`, or a
|
||||
//! // `Swarm` from from `libp2p-swarm`. See the documentation of these
|
||||
//! // crates for mode details.
|
||||
//!
|
||||
//! // let network = Network::new(transport, local_keys.public().into_peer_id());
|
||||
//! // let swarm = Swarm::new(transport, behaviour, local_keys.public().into_peer_id());
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! # Manual usage
|
||||
//!
|
||||
//! > **Note**: You are encouraged to use `SecioConfig` as described above.
|
||||
//!
|
||||
//! You can add the `secio` layer over a socket by calling `SecioMiddleware::handshake()`. This
|
||||
//! method will perform a handshake with the host, and return a future that corresponds to the
|
||||
//! moment when the handshake succeeds or errored. On success, the future produces a
|
||||
//! `SecioMiddleware` that implements `Sink` and `Stream` and can be used to send packets of data.
|
||||
//!
|
||||
|
||||
pub use self::error::SecioError;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::stream::MapErr as StreamMapErr;
|
||||
use futures::{Future, Poll, Sink, StartSend, Stream};
|
||||
use libp2p_core::{PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}};
|
||||
use libp2p_core::{
|
||||
PeerId,
|
||||
PublicKey,
|
||||
identity,
|
||||
upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}
|
||||
};
|
||||
use log::debug;
|
||||
use rw_stream_sink::RwStreamSink;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::io;
|
||||
use std::iter;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
@ -145,7 +134,7 @@ impl SecioConfig {
|
||||
self
|
||||
}
|
||||
|
||||
fn handshake<T>(self, socket: T) -> impl Future<Item=SecioOutput<T>, Error=SecioError>
|
||||
fn handshake<T>(self, socket: T) -> impl Future<Item=(PeerId, SecioOutput<T>), Error=SecioError>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Send + 'static
|
||||
{
|
||||
@ -153,11 +142,13 @@ impl SecioConfig {
|
||||
SecioMiddleware::handshake(socket, self)
|
||||
.map(|(stream_sink, pubkey, ephemeral)| {
|
||||
let mapped = stream_sink.map_err(map_err as fn(_) -> _);
|
||||
SecioOutput {
|
||||
let peer = pubkey.clone().into_peer_id();
|
||||
let io = SecioOutput {
|
||||
stream: RwStreamSink::new(mapped),
|
||||
remote_key: pubkey,
|
||||
ephemeral_public_key: ephemeral
|
||||
}
|
||||
};
|
||||
(peer, io)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -168,7 +159,7 @@ where
|
||||
S: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// The encrypted stream.
|
||||
pub stream: RwStreamSink<StreamMapErr<SecioMiddleware<S>, fn(SecioError) -> IoError>>,
|
||||
pub stream: RwStreamSink<StreamMapErr<SecioMiddleware<S>, fn(SecioError) -> io::Error>>,
|
||||
/// The public key of the remote.
|
||||
pub remote_key: PublicKey,
|
||||
/// Ephemeral public key used during the negotiation.
|
||||
@ -188,7 +179,7 @@ impl<T> InboundUpgrade<T> for SecioConfig
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Send + 'static
|
||||
{
|
||||
type Output = SecioOutput<Negotiated<T>>;
|
||||
type Output = (PeerId, SecioOutput<Negotiated<T>>);
|
||||
type Error = SecioError;
|
||||
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
|
||||
|
||||
@ -201,7 +192,7 @@ impl<T> OutboundUpgrade<T> for SecioConfig
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Send + 'static
|
||||
{
|
||||
type Output = SecioOutput<Negotiated<T>>;
|
||||
type Output = (PeerId, SecioOutput<Negotiated<T>>);
|
||||
type Error = SecioError;
|
||||
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
|
||||
|
||||
@ -210,10 +201,37 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn map_err(err: SecioError) -> IoError {
|
||||
impl<S: AsyncRead + AsyncWrite> io::Read for SecioOutput<S> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.stream.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncRead for SecioOutput<S> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.stream.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite> io::Write for SecioOutput<S> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.stream.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.stream.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite> AsyncWrite for SecioOutput<S> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.stream.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
fn map_err(err: SecioError) -> io::Error {
|
||||
debug!("error during secio handshake {:?}", err);
|
||||
IoError::new(IoErrorKind::InvalidData, err)
|
||||
io::Error::new(io::ErrorKind::InvalidData, err)
|
||||
}
|
||||
|
||||
/// Wraps around an object that implements `AsyncRead` and `AsyncWrite`.
|
||||
@ -247,7 +265,7 @@ where
|
||||
S: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type SinkItem = BytesMut;
|
||||
type SinkError = IoError;
|
||||
type SinkError = io::Error;
|
||||
|
||||
#[inline]
|
||||
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
|
||||
|
28
src/lib.rs
28
src/lib.rs
@ -82,15 +82,15 @@
|
||||
//! *upgraded*. Upgrading a transport is the process of negotiating an additional protocol
|
||||
//! with the remote, mediated through a negotiation protocol called [`multistream-select`].
|
||||
//!
|
||||
//! Example ([`secio`] Protocol Upgrade):
|
||||
//! Example ([`secio`] + [`yamux`] Protocol Upgrade):
|
||||
//!
|
||||
//! ```rust
|
||||
//! # #[cfg(all(not(any(target_os = "emscripten", target_os = "unknown")), feature = "libp2p-secio"))] {
|
||||
//! use libp2p::{Transport, tcp::TcpConfig, secio::SecioConfig, identity::Keypair};
|
||||
//! use libp2p::{Transport, tcp::TcpConfig, secio::SecioConfig, identity::Keypair, yamux};
|
||||
//! let tcp = TcpConfig::new();
|
||||
//! let secio_upgrade = SecioConfig::new(Keypair::generate_ed25519());
|
||||
//! let tcp_secio = tcp.with_upgrade(secio_upgrade);
|
||||
//! // let _ = tcp_secio.dial(...);
|
||||
//! let secio = SecioConfig::new(Keypair::generate_ed25519());
|
||||
//! let yamux = yamux::Config::default();
|
||||
//! let transport = tcp.upgrade().authenticate(secio).multiplex(yamux);
|
||||
//! # }
|
||||
//! ```
|
||||
//! In this example, `tcp_secio` is a new [`Transport`] that negotiates the secio protocol
|
||||
@ -222,7 +222,6 @@ pub use self::simple::SimpleProtocol;
|
||||
pub use self::swarm::Swarm;
|
||||
pub use self::transport_ext::TransportExt;
|
||||
|
||||
use futures::prelude::*;
|
||||
use std::{error, io, time::Duration};
|
||||
|
||||
/// Builds a `Transport` that supports the most commonly-used protocols that libp2p supports.
|
||||
@ -245,18 +244,11 @@ pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair)
|
||||
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
|
||||
{
|
||||
CommonTransport::new()
|
||||
.with_upgrade(secio::SecioConfig::new(keypair))
|
||||
.and_then(move |output, endpoint| {
|
||||
let peer_id = output.remote_key.into_peer_id();
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())
|
||||
// TODO: use a single `.map` instead of two maps
|
||||
.map_inbound(move |muxer| (peer_id, muxer))
|
||||
.map_outbound(move |muxer| (peer_id2, muxer));
|
||||
core::upgrade::apply(output.stream, upgrade, endpoint)
|
||||
.map(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
|
||||
})
|
||||
.with_timeout(Duration::from_secs(20))
|
||||
.upgrade()
|
||||
.authenticate(secio::SecioConfig::new(keypair))
|
||||
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
|
||||
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
|
||||
.timeout(Duration::from_secs(20))
|
||||
}
|
||||
|
||||
/// Implementation of `Transport` that supports the most common protocols.
|
||||
|
@ -18,7 +18,7 @@ rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" }
|
||||
tokio-codec = "0.1.1"
|
||||
tokio-io = "0.1.12"
|
||||
tokio-rustls = "0.10.0-alpha.3"
|
||||
soketto = { version = "0.2.0", features = ["deflate"] }
|
||||
soketto = { version = "0.2.3", features = ["deflate"] }
|
||||
url = "1.7.2"
|
||||
webpki-roots = "0.16.0"
|
||||
|
||||
|
Reference in New Issue
Block a user