mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Add a IdentifyTransport (#569)
* Add a IdentifyTransport * Retreiver -> Retriever * Move the muxer in the IdRetrieverState
This commit is contained in:
parent
437a8c0fde
commit
4225d2631b
229
protocols/identify/src/id_transport.rs
Normal file
229
protocols/identify/src/id_transport.rs
Normal file
@ -0,0 +1,229 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
//! Contains the `IdentifyTransport` type.
|
||||||
|
|
||||||
|
use futures::prelude::*;
|
||||||
|
use libp2p_core::{Endpoint, Multiaddr, PeerId, PublicKey, Transport, muxing, upgrade::apply};
|
||||||
|
use protocol::{IdentifyOutput, IdentifyProtocolConfig};
|
||||||
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
|
use std::mem;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// Wraps around an implementation of `Transport` that yields a muxer. Will use the muxer to
|
||||||
|
/// open a substream with the remote and retreive its peer id. Then yields a
|
||||||
|
/// `(PeerId, impl StreamMuxer)`.
|
||||||
|
///
|
||||||
|
/// This transport can be used if you don't use any encryption layer, or if you want to make
|
||||||
|
/// encryption optional, in which case you have no other way to know the `PeerId` of the remote
|
||||||
|
/// than to ask for it.
|
||||||
|
///
|
||||||
|
/// > **Note**: If you use this transport, keep in mind that the `PeerId` returned by the remote
|
||||||
|
/// > can be anything and shouldn't necessarily be trusted.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct IdentifyTransport<TTrans> {
|
||||||
|
/// The underlying transport we wrap around.
|
||||||
|
transport: TTrans,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TTrans> IdentifyTransport<TTrans> {
|
||||||
|
/// Creates an `IdentifyTransport` that wraps around the given transport.
|
||||||
|
#[inline]
|
||||||
|
pub fn new(transport: TTrans) -> Self {
|
||||||
|
IdentifyTransport {
|
||||||
|
transport,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: don't use boxes
|
||||||
|
impl<TTrans, TMuxer> Transport for IdentifyTransport<TTrans>
|
||||||
|
where
|
||||||
|
TTrans: Transport<Output = TMuxer>,
|
||||||
|
TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds
|
||||||
|
TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds
|
||||||
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: remove unnecessary bounds
|
||||||
|
TTrans::Dial: Send + Sync + 'static,
|
||||||
|
TTrans::Listener: Send + 'static,
|
||||||
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
|
{
|
||||||
|
type Output = (PeerId, TMuxer);
|
||||||
|
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError> + Send>;
|
||||||
|
type ListenerUpgrade = Box<Future<Item = Self::Output, Error = IoError> + Send>;
|
||||||
|
type Dial = Box<Future<Item = Self::Output, Error = IoError> + Send>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
|
let (listener, new_addr) = match self.transport.listen_on(addr) {
|
||||||
|
Ok((l, a)) => (l, a),
|
||||||
|
Err((inner, addr)) => {
|
||||||
|
let id = IdentifyTransport {
|
||||||
|
transport: inner,
|
||||||
|
};
|
||||||
|
return Err((id, addr));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let listener = listener
|
||||||
|
.map(move |(upgrade, remote_addr)| {
|
||||||
|
let upgr = upgrade
|
||||||
|
.and_then(move |muxer| {
|
||||||
|
IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Listener)
|
||||||
|
});
|
||||||
|
(Box::new(upgr) as Box<Future<Item = _, Error = _> + Send>, remote_addr)
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok((Box::new(listener) as Box<_>, new_addr))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
|
// We dial a first time the node.
|
||||||
|
let dial = match self.transport.dial(addr.clone()) {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err((transport, addr)) => {
|
||||||
|
let id = IdentifyTransport {
|
||||||
|
transport,
|
||||||
|
};
|
||||||
|
return Err((id, addr));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let dial = dial.and_then(move |muxer| {
|
||||||
|
IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Dialer)
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Box::new(dial) as Box<_>)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
|
||||||
|
self.transport.nat_traversal(a, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Implementation of `Future` that asks the remote of its `PeerId`.
|
||||||
|
// TODO: remove unneeded bounds
|
||||||
|
struct IdRetriever<TMuxer>
|
||||||
|
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
||||||
|
TMuxer::Substream: Send,
|
||||||
|
{
|
||||||
|
/// Internal state.
|
||||||
|
state: IdRetrieverState<TMuxer>,
|
||||||
|
/// Whether we're dialing or listening.
|
||||||
|
endpoint: Endpoint,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum IdRetrieverState<TMuxer>
|
||||||
|
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
||||||
|
TMuxer::Substream: Send,
|
||||||
|
{
|
||||||
|
/// We are in the process of opening a substream with the remote.
|
||||||
|
OpeningSubstream(Arc<TMuxer>, muxing::OutboundSubstreamRefWrapFuture<Arc<TMuxer>>, IdentifyProtocolConfig),
|
||||||
|
/// We opened the substream and are currently negotiating the identify protocol.
|
||||||
|
NegotiatingIdentify(Arc<TMuxer>, apply::UpgradeApplyFuture<muxing::SubstreamRef<Arc<TMuxer>>, IdentifyProtocolConfig>),
|
||||||
|
/// We retreived the remote's public key and are ready to yield it when polled again.
|
||||||
|
Finishing(Arc<TMuxer>, PublicKey),
|
||||||
|
/// Something bad happend, or the `Future` is finished, and shouldn't be polled again.
|
||||||
|
Poisoned,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TMuxer> IdRetriever<TMuxer>
|
||||||
|
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
||||||
|
TMuxer::Substream: Send,
|
||||||
|
{
|
||||||
|
/// Creates a new `IdRetriever` ready to be polled.
|
||||||
|
fn new(muxer: TMuxer, config: IdentifyProtocolConfig, endpoint: Endpoint) -> Self {
|
||||||
|
let muxer = Arc::new(muxer);
|
||||||
|
let opening = muxing::outbound_from_ref_and_wrap(muxer.clone());
|
||||||
|
|
||||||
|
IdRetriever {
|
||||||
|
state: IdRetrieverState::OpeningSubstream(muxer, opening, config),
|
||||||
|
endpoint,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TMuxer> Future for IdRetriever<TMuxer>
|
||||||
|
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
||||||
|
TMuxer::Substream: Send,
|
||||||
|
{
|
||||||
|
type Item = (PeerId, TMuxer);
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
// This loop is here so that we can continue polling until we're ready.
|
||||||
|
loop {
|
||||||
|
// In order to satisfy the borrow checker, we extract the state and temporarily put
|
||||||
|
// `Poisoned` instead.
|
||||||
|
match mem::replace(&mut self.state, IdRetrieverState::Poisoned) {
|
||||||
|
IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => {
|
||||||
|
match opening.poll() {
|
||||||
|
Ok(Async::Ready(Some(substream))) => {
|
||||||
|
let upgrade = apply::apply(substream, config, self.endpoint);
|
||||||
|
self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
return Err(IoError::new(IoErrorKind::Other, "remote refused our identify attempt"));
|
||||||
|
},
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config);
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
},
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => {
|
||||||
|
match nego.poll() {
|
||||||
|
Ok(Async::Ready(IdentifyOutput::RemoteInfo { info, .. })) => {
|
||||||
|
self.state = IdRetrieverState::Finishing(muxer, info.public_key);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(IdentifyOutput::Sender { .. })) => {
|
||||||
|
unreachable!("IdentifyOutput::Sender can never be the output from \
|
||||||
|
the dialing side");
|
||||||
|
},
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.state = IdRetrieverState::NegotiatingIdentify(muxer, nego);
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
},
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
IdRetrieverState::Finishing(muxer, public_key) => {
|
||||||
|
// Here is a tricky part: we need to get back the muxer in order to return
|
||||||
|
// it, but it is in an `Arc`.
|
||||||
|
let unwrapped = Arc::try_unwrap(muxer).unwrap_or_else(|_| {
|
||||||
|
panic!("we clone the Arc only to put it into substreams ; once in the \
|
||||||
|
Finishing state, no substream or upgrade exists anymore ; \
|
||||||
|
therefore there exists only one instance of the Arc ; qed")
|
||||||
|
});
|
||||||
|
|
||||||
|
// We leave `Poisoned` as the state when returning.
|
||||||
|
return Ok(Async::Ready((public_key.into(), unwrapped)));
|
||||||
|
},
|
||||||
|
IdRetrieverState::Poisoned => {
|
||||||
|
panic!("Future state panicked inside poll() or is finished")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: write basic working test
|
@ -81,10 +81,12 @@ extern crate tokio_timer;
|
|||||||
extern crate unsigned_varint;
|
extern crate unsigned_varint;
|
||||||
extern crate void;
|
extern crate void;
|
||||||
|
|
||||||
|
pub use self::id_transport::IdentifyTransport;
|
||||||
pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent};
|
pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent};
|
||||||
pub use self::protocol::{IdentifyInfo, IdentifyOutput};
|
pub use self::protocol::{IdentifyInfo, IdentifyOutput};
|
||||||
pub use self::protocol::{IdentifyProtocolConfig, IdentifySender};
|
pub use self::protocol::{IdentifyProtocolConfig, IdentifySender};
|
||||||
|
|
||||||
|
mod id_transport;
|
||||||
mod periodic_id_handler;
|
mod periodic_id_handler;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
mod structs_proto;
|
mod structs_proto;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user