diff --git a/protocols/identify/src/id_transport.rs b/protocols/identify/src/id_transport.rs deleted file mode 100644 index d6549610..00000000 --- a/protocols/identify/src/id_transport.rs +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Contains the `IdentifyTransport` type. - -use crate::protocol::{RemoteInfo, IdentifyProtocolConfig}; -use futures::{future, prelude::*, stream, AndThen, MapErr}; -use libp2p_core::{ - Multiaddr, PeerId, PublicKey, muxing, Transport, - transport::{TransportError, ListenerEvent, upgrade::TransportUpgradeError}, - upgrade::{self, OutboundUpgradeApply, UpgradeError} -}; -use std::io::Error as IoError; -use std::mem; -use std::sync::Arc; - -/// Wraps around an implementation of `Transport` that yields a muxer. Will use the muxer to -/// open a substream with the remote and retreive its peer id. Then yields a -/// `(PeerId, impl StreamMuxer)`. -/// -/// This transport can be used if you don't use any encryption layer, or if you want to make -/// encryption optional, in which case you have no other way to know the `PeerId` of the remote -/// than to ask for it. -/// -/// > **Note**: If you use this transport, keep in mind that the `PeerId` returned by the remote -/// > can be anything and shouldn't necessarily be trusted. -#[derive(Debug, Clone)] -pub struct IdentifyTransport { - /// The underlying transport we wrap around. - transport: TTrans, -} - -impl IdentifyTransport { - /// Creates an `IdentifyTransport` that wraps around the given transport. - #[inline] - pub fn new(transport: TTrans) -> Self { - IdentifyTransport { - transport, - } - } -} - -impl Transport for IdentifyTransport -where - TTrans: Transport, - TTrans::Error: 'static, - TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds - TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds -{ - type Output = (PeerId, TMuxer); - type Error = TransportUpgradeError; // TODO: better than IoError - type Listener = stream::Empty, Self::Error>; - type ListenerUpgrade = future::Empty; - type Dial = AndThen< - MapErr Self::Error>, - MapErr, fn(UpgradeError) -> Self::Error>, - fn(TMuxer) -> MapErr, fn(UpgradeError) -> Self::Error> - >; - - fn listen_on(self, addr: Multiaddr) -> Result> { - Err(TransportError::MultiaddrNotSupported(addr)) - } - - fn dial(self, addr: Multiaddr) -> Result> { - // We dial a first time the node. - let dial = self.transport.dial(addr) - .map_err(|err| err.map(TransportUpgradeError::Transport))?; - Ok(dial.map_err:: _, _>(TransportUpgradeError::Transport).and_then(|muxer| { - IdRetriever::new(muxer, IdentifyProtocolConfig).map_err(TransportUpgradeError::Upgrade) - })) - } -} - -/// Implementation of `Future` that asks the remote of its `PeerId`. -// TODO: remove unneeded bounds -pub struct IdRetriever -where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, - TMuxer::Substream: Send, -{ - /// Internal state. - state: IdRetrieverState -} - -enum IdRetrieverState -where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, - TMuxer::Substream: Send, -{ - /// We are in the process of opening a substream with the remote. - OpeningSubstream(Arc, muxing::OutboundSubstreamRefWrapFuture>, IdentifyProtocolConfig), - /// We opened the substream and are currently negotiating the identify protocol. - NegotiatingIdentify(Arc, OutboundUpgradeApply>, IdentifyProtocolConfig>), - /// We retreived the remote's public key and are ready to yield it when polled again. - Finishing(Arc, PublicKey), - /// Something bad happend, or the `Future` is finished, and shouldn't be polled again. - Poisoned, -} - -impl IdRetriever -where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, - TMuxer::Substream: Send, -{ - /// Creates a new `IdRetriever` ready to be polled. - fn new(muxer: TMuxer, config: IdentifyProtocolConfig) -> Self { - let muxer = Arc::new(muxer); - let opening = muxing::outbound_from_ref_and_wrap(muxer.clone()); - - IdRetriever { - state: IdRetrieverState::OpeningSubstream(muxer, opening, config) - } - } -} - -impl Future for IdRetriever -where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, - TMuxer::Substream: Send, -{ - type Item = (PeerId, TMuxer); - type Error = UpgradeError; - - fn poll(&mut self) -> Poll { - // This loop is here so that we can continue polling until we're ready. - loop { - // In order to satisfy the borrow checker, we extract the state and temporarily put - // `Poisoned` instead. - match mem::replace(&mut self.state, IdRetrieverState::Poisoned) { - IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => { - match opening.poll() { - Ok(Async::Ready(substream)) => { - let upgrade = upgrade::apply_outbound(substream, config); - self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade) - }, - Ok(Async::NotReady) => { - self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config); - return Ok(Async::NotReady); - }, - Err(err) => return Err(UpgradeError::Apply(err.into())) - } - }, - IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => { - match nego.poll() { - Ok(Async::Ready(RemoteInfo { info, .. })) => { - self.state = IdRetrieverState::Finishing(muxer, info.public_key); - }, - Ok(Async::NotReady) => { - self.state = IdRetrieverState::NegotiatingIdentify(muxer, nego); - return Ok(Async::NotReady); - }, - Err(err) => return Err(err), - } - }, - IdRetrieverState::Finishing(muxer, public_key) => { - // Here is a tricky part: we need to get back the muxer in order to return - // it, but it is in an `Arc`. - let unwrapped = Arc::try_unwrap(muxer).unwrap_or_else(|_| { - panic!("We clone the Arc only to put it into substreams. Once in the \ - Finishing state, no substream or upgrade exists anymore. \ - Therefore, there exists only one instance of the Arc. QED") - }); - - // We leave `Poisoned` as the state when returning. - return Ok(Async::Ready((public_key.into(), unwrapped))); - }, - IdRetrieverState::Poisoned => { - panic!("Future state panicked inside poll() or is finished") - }, - } - } - } -} - -// TODO: write basic working test diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index 48a9b082..82e02b62 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -66,7 +66,6 @@ //! it. pub use self::identify::{Identify, IdentifyEvent}; -pub use self::id_transport::IdentifyTransport; pub use self::protocol::IdentifyInfo; pub mod listen_handler; @@ -74,5 +73,4 @@ pub mod periodic_id_handler; pub mod protocol; mod identify; -mod id_transport; mod structs_proto;