mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-09 16:11:20 +00:00
Remove other unused files (#570)
This commit is contained in:
parent
a77c1a6bf7
commit
1e223f627d
@ -1,333 +0,0 @@
|
|||||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
|
||||||
//
|
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
||||||
// copy of this software and associated documentation files (the "Software"),
|
|
||||||
// to deal in the Software without restriction, including without limitation
|
|
||||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
||||||
// and/or sell copies of the Software, and to permit persons to whom the
|
|
||||||
// Software is furnished to do so, subject to the following conditions:
|
|
||||||
//
|
|
||||||
// The above copyright notice and this permission notice shall be included in
|
|
||||||
// all copies or substantial portions of the Software.
|
|
||||||
//
|
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
||||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
||||||
// DEALINGS IN THE SOFTWARE.
|
|
||||||
|
|
||||||
use fnv::FnvHashMap;
|
|
||||||
use futures::{future, Future, Stream};
|
|
||||||
use libp2p_core::{Multiaddr, MuxedTransport, Transport};
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
use protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig};
|
|
||||||
use std::collections::hash_map::Entry;
|
|
||||||
use std::error::Error;
|
|
||||||
use std::io::Error as IoError;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
|
||||||
|
|
||||||
/// Implementation of `Transport`. See [the crate root description](index.html).
|
|
||||||
pub struct IdentifyTransport<Trans> {
|
|
||||||
transport: Trans,
|
|
||||||
// Each entry is protected by an asynchronous mutex, so that if we dial the same node twice
|
|
||||||
// simultaneously, the second time will block until the first time has identified it.
|
|
||||||
cache: Arc<Mutex<FnvHashMap<Multiaddr, CacheEntry>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Trans> Clone for IdentifyTransport<Trans>
|
|
||||||
where Trans: Clone,
|
|
||||||
{
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
IdentifyTransport {
|
|
||||||
transport: self.transport.clone(),
|
|
||||||
cache: self.cache.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type CacheEntry = future::Shared<Box<Future<Item = IdentifyTransportOutcome, Error = IoError> + Send>>;
|
|
||||||
|
|
||||||
impl<Trans> IdentifyTransport<Trans> {
|
|
||||||
/// Creates an `IdentifyTransport` that wraps around the given transport and peerstore.
|
|
||||||
#[inline]
|
|
||||||
pub fn new(transport: Trans) -> Self {
|
|
||||||
IdentifyTransport {
|
|
||||||
transport,
|
|
||||||
cache: Arc::new(Mutex::new(Default::default())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Trans> Transport for IdentifyTransport<Trans>
|
|
||||||
where
|
|
||||||
Trans: Transport + Clone + Send + 'static, // TODO: 'static :(
|
|
||||||
Trans::Dial: Send,
|
|
||||||
Trans::Listener: Send,
|
|
||||||
Trans::ListenerUpgrade: Send,
|
|
||||||
Trans::MultiaddrFuture: Send,
|
|
||||||
Trans::Output: AsyncRead + AsyncWrite + Send,
|
|
||||||
{
|
|
||||||
type Output = IdentifyTransportOutput<Trans::Output>;
|
|
||||||
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
|
|
||||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
|
|
||||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
|
||||||
let (listener, new_addr) = match self.transport.clone().listen_on(addr.clone()) {
|
|
||||||
Ok((l, a)) => (l, a),
|
|
||||||
Err((inner, addr)) => {
|
|
||||||
let id = IdentifyTransport {
|
|
||||||
transport: inner,
|
|
||||||
cache: self.cache,
|
|
||||||
};
|
|
||||||
return Err((id, addr));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let identify_upgrade = self.transport.with_upgrade(IdentifyProtocolConfig);
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
|
|
||||||
let listener = listener.map(move |connec| {
|
|
||||||
let identify_upgrade = identify_upgrade.clone();
|
|
||||||
let cache = cache.clone();
|
|
||||||
let fut = connec
|
|
||||||
.and_then(move |(connec, client_addr)| {
|
|
||||||
trace!("Incoming connection, waiting for client address");
|
|
||||||
client_addr.map(move |addr| (connec, addr))
|
|
||||||
})
|
|
||||||
.and_then(move |(connec, client_addr)| {
|
|
||||||
debug!("Incoming connection from {}", client_addr);
|
|
||||||
|
|
||||||
// Dial the address that connected to us and try upgrade with the
|
|
||||||
// identify protocol.
|
|
||||||
let info_future = cache_entry(&cache, client_addr.clone(), { let client_addr = client_addr.clone(); move || {
|
|
||||||
debug!("No cache entry for {}, dialing back in order to identify", client_addr);
|
|
||||||
future::lazy(|| { trace!("Starting identify back"); identify_upgrade
|
|
||||||
.dial(client_addr)
|
|
||||||
.unwrap_or_else(|(_, addr)| {
|
|
||||||
panic!("the multiaddr {} was determined to be valid earlier", addr)
|
|
||||||
}) })
|
|
||||||
.map(move |(identify, _)| {
|
|
||||||
let (info, observed_addr) = match identify {
|
|
||||||
IdentifyOutput::RemoteInfo { info, observed_addr } => {
|
|
||||||
(info, observed_addr)
|
|
||||||
},
|
|
||||||
_ => unreachable!(
|
|
||||||
"the identify protocol guarantees that we receive \
|
|
||||||
remote information when we dial a node"
|
|
||||||
),
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("Identified dialed back connection as pubkey {:?}", info.public_key);
|
|
||||||
IdentifyTransportOutcome {
|
|
||||||
info,
|
|
||||||
observed_addr,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.map_err(move |err| {
|
|
||||||
debug!("Failed to identify dialed back connection");
|
|
||||||
err
|
|
||||||
})
|
|
||||||
}});
|
|
||||||
|
|
||||||
let out = IdentifyTransportOutput {
|
|
||||||
socket: connec,
|
|
||||||
info: Box::new(info_future),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((out, future::ok(client_addr)))
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok((Box::new(listener) as Box<_>, new_addr))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
|
||||||
// We dial a first time the node.
|
|
||||||
let dial = match self.transport.clone().dial(addr) {
|
|
||||||
Ok(d) => d,
|
|
||||||
Err((transport, addr)) => {
|
|
||||||
let id = IdentifyTransport {
|
|
||||||
transport,
|
|
||||||
cache: self.cache,
|
|
||||||
};
|
|
||||||
return Err((id, addr));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Once successfully dialed, we dial again to identify.
|
|
||||||
let identify_upgrade = self.transport.with_upgrade(IdentifyProtocolConfig);
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
let future = dial
|
|
||||||
.and_then(move |(connec, client_addr)| {
|
|
||||||
trace!("Dialing successful, waiting for client address");
|
|
||||||
client_addr.map(move |addr| (connec, addr))
|
|
||||||
})
|
|
||||||
.and_then(move |(socket, addr)| {
|
|
||||||
trace!("Dialing successful ; client address is {}", addr);
|
|
||||||
let info_future = cache_entry(&cache, addr.clone(), { let addr = addr.clone(); move || {
|
|
||||||
trace!("No cache entry for {} ; dialing again for identification", addr);
|
|
||||||
future::lazy(|| { trace!("Starting identify back"); identify_upgrade
|
|
||||||
.dial(addr)
|
|
||||||
.unwrap_or_else(|(_, addr)| {
|
|
||||||
panic!("the multiaddr {} was determined to be valid earlier", addr)
|
|
||||||
}) })
|
|
||||||
.map(move |(identify, _)| {
|
|
||||||
let (info, observed_addr) = match identify {
|
|
||||||
IdentifyOutput::RemoteInfo { info, observed_addr } => {
|
|
||||||
(info, observed_addr)
|
|
||||||
}
|
|
||||||
_ => unreachable!(
|
|
||||||
"the identify protocol guarantees that we receive \
|
|
||||||
remote information when we dial a node"
|
|
||||||
),
|
|
||||||
};
|
|
||||||
|
|
||||||
IdentifyTransportOutcome {
|
|
||||||
info,
|
|
||||||
observed_addr,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}});
|
|
||||||
|
|
||||||
let out = IdentifyTransportOutput {
|
|
||||||
socket: socket,
|
|
||||||
info: Box::new(info_future),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((out, future::ok(addr)))
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Box::new(future) as Box<_>)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
|
|
||||||
self.transport.nat_traversal(a, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Trans> MuxedTransport for IdentifyTransport<Trans>
|
|
||||||
where
|
|
||||||
Trans: MuxedTransport + Clone + Send + 'static,
|
|
||||||
Trans::Dial: Send,
|
|
||||||
Trans::Listener: Send,
|
|
||||||
Trans::ListenerUpgrade: Send,
|
|
||||||
Trans::MultiaddrFuture: Send,
|
|
||||||
Trans::Output: AsyncRead + AsyncWrite + Send,
|
|
||||||
Trans::Incoming: Send,
|
|
||||||
Trans::IncomingUpgrade: Send,
|
|
||||||
{
|
|
||||||
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
|
|
||||||
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn next_incoming(self) -> Self::Incoming {
|
|
||||||
let identify_upgrade = self.transport.clone().with_upgrade(IdentifyProtocolConfig);
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
|
|
||||||
let future = self.transport.next_incoming().map(move |incoming| {
|
|
||||||
let cache = cache.clone();
|
|
||||||
let future = incoming
|
|
||||||
.and_then(move |(connec, client_addr)| {
|
|
||||||
debug!("Incoming substream ; waiting for client address");
|
|
||||||
client_addr.map(move |addr| (connec, addr))
|
|
||||||
})
|
|
||||||
.and_then(move |(connec, client_addr)| {
|
|
||||||
debug!("Incoming substream from {}", client_addr);
|
|
||||||
|
|
||||||
// Dial the address that connected to us and try upgrade with the
|
|
||||||
// identify protocol.
|
|
||||||
let info_future = cache_entry(&cache, client_addr.clone(), { let client_addr = client_addr.clone(); move || {
|
|
||||||
debug!("No cache entry from {} ; dialing back to identify", client_addr);
|
|
||||||
future::lazy(|| { trace!("Starting identify back"); identify_upgrade
|
|
||||||
.dial(client_addr)
|
|
||||||
.unwrap_or_else(|(_, client_addr)| {
|
|
||||||
panic!("the multiaddr {} was determined to be valid earlier", client_addr)
|
|
||||||
}) })
|
|
||||||
.map(move |(identify, _)| {
|
|
||||||
let (info, observed_addr) = match identify {
|
|
||||||
IdentifyOutput::RemoteInfo { info, observed_addr } => {
|
|
||||||
(info, observed_addr)
|
|
||||||
},
|
|
||||||
_ => unreachable!(
|
|
||||||
"the identify protocol guarantees that we receive \
|
|
||||||
remote information when we dial a node"
|
|
||||||
),
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("Identified incoming substream as pubkey {:?}", info.public_key);
|
|
||||||
IdentifyTransportOutcome {
|
|
||||||
info,
|
|
||||||
observed_addr,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.map_err(move |err| {
|
|
||||||
debug!("Failed to identify incoming substream");
|
|
||||||
err
|
|
||||||
})
|
|
||||||
}});
|
|
||||||
|
|
||||||
let out = IdentifyTransportOutput {
|
|
||||||
socket: connec,
|
|
||||||
info: Box::new(info_future),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((out, future::ok(client_addr)))
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(future) as Box<_>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Output of the identify transport.
|
|
||||||
pub struct IdentifyTransportOutput<S> {
|
|
||||||
/// The socket to communicate with the remote.
|
|
||||||
pub socket: S,
|
|
||||||
/// Outcome of the identification of the remote.
|
|
||||||
pub info: Box<Future<Item = IdentifyTransportOutcome, Error = IoError> + Send>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Outcome of the identification of the remote.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct IdentifyTransportOutcome {
|
|
||||||
/// Identification of the remote.
|
|
||||||
pub info: IdentifyInfo,
|
|
||||||
/// Address the remote sees for us.
|
|
||||||
pub observed_addr: Multiaddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cache_entry<F, Fut>(cache: &Mutex<FnvHashMap<Multiaddr, CacheEntry>>, addr: Multiaddr, if_no_entry: F)
|
|
||||||
-> impl Future<Item = IdentifyTransportOutcome, Error = IoError>
|
|
||||||
where F: FnOnce() -> Fut,
|
|
||||||
Fut: Future<Item = IdentifyTransportOutcome, Error = IoError> + Send + 'static,
|
|
||||||
{
|
|
||||||
trace!("Looking up cache entry for {}", addr);
|
|
||||||
let mut cache = cache.lock();
|
|
||||||
match cache.entry(addr) {
|
|
||||||
Entry::Occupied(entry) => {
|
|
||||||
trace!("Cache entry found, cloning");
|
|
||||||
future::Either::A(entry.get().clone())
|
|
||||||
},
|
|
||||||
|
|
||||||
Entry::Vacant(entry) => {
|
|
||||||
trace!("No cache entry available");
|
|
||||||
let future = (Box::new(if_no_entry()) as Box<Future<Item = _, Error = _> + Send>).shared();
|
|
||||||
entry.insert(future.clone());
|
|
||||||
future::Either::B(future)
|
|
||||||
},
|
|
||||||
}.map(|out| (*out).clone()).map_err(|err| IoError::new(err.kind(), err.description()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: test that we receive back what the remote sent us
|
|
@ -1,370 +0,0 @@
|
|||||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
|
||||||
//
|
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
||||||
// copy of this software and associated documentation files (the "Software"),
|
|
||||||
// to deal in the Software without restriction, including without limitation
|
|
||||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
||||||
// and/or sell copies of the Software, and to permit persons to whom the
|
|
||||||
// Software is furnished to do so, subject to the following conditions:
|
|
||||||
//
|
|
||||||
// The above copyright notice and this permission notice shall be included in
|
|
||||||
// all copies or substantial portions of the Software.
|
|
||||||
//
|
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
||||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
||||||
// DEALINGS IN THE SOFTWARE.
|
|
||||||
|
|
||||||
use futures::{future, stream, Future, Stream};
|
|
||||||
use identify_transport::{IdentifyTransport, IdentifyTransportOutcome};
|
|
||||||
use libp2p_core::{PeerId, MuxedTransport, Transport};
|
|
||||||
use multiaddr::{Protocol, Multiaddr};
|
|
||||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
|
||||||
|
|
||||||
/// Implementation of `Transport`. See [the crate root description](index.html).
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct PeerIdTransport<Trans, AddrRes> {
|
|
||||||
transport: IdentifyTransport<Trans>,
|
|
||||||
addr_resolver: AddrRes,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Trans, AddrRes> PeerIdTransport<Trans, AddrRes> {
|
|
||||||
/// Creates an `PeerIdTransport` that wraps around the given transport and address resolver.
|
|
||||||
#[inline]
|
|
||||||
pub fn new(transport: Trans, addr_resolver: AddrRes) -> Self {
|
|
||||||
PeerIdTransport {
|
|
||||||
transport: IdentifyTransport::new(transport),
|
|
||||||
addr_resolver,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Trans, AddrRes, AddrResOut> Transport for PeerIdTransport<Trans, AddrRes>
|
|
||||||
where
|
|
||||||
Trans: Transport + Clone + Send + 'static, // TODO: 'static :(
|
|
||||||
Trans::Dial: Send,
|
|
||||||
Trans::Listener: Send,
|
|
||||||
Trans::ListenerUpgrade: Send,
|
|
||||||
Trans::MultiaddrFuture: Send,
|
|
||||||
Trans::Output: AsyncRead + AsyncWrite + Send,
|
|
||||||
AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :(
|
|
||||||
AddrResOut: IntoIterator<Item = Multiaddr> + 'static, // TODO: 'static :(
|
|
||||||
AddrResOut::IntoIter: Send,
|
|
||||||
{
|
|
||||||
type Output = PeerIdTransportOutput<Trans::Output>;
|
|
||||||
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError> + Send>;
|
|
||||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
|
|
||||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
|
||||||
// Note that `listen_on` expects a "regular" multiaddr (eg. `/ip/.../tcp/...`),
|
|
||||||
// and not `/p2p/<foo>`.
|
|
||||||
|
|
||||||
let (listener, listened_addr) = match self.transport.listen_on(addr) {
|
|
||||||
Ok((listener, addr)) => (listener, addr),
|
|
||||||
Err((inner, addr)) => {
|
|
||||||
let id = PeerIdTransport {
|
|
||||||
transport: inner,
|
|
||||||
addr_resolver: self.addr_resolver,
|
|
||||||
};
|
|
||||||
|
|
||||||
return Err((id, addr));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let listener = listener.map(move |connec| {
|
|
||||||
let fut = connec
|
|
||||||
.and_then(move |(connec, client_addr)| {
|
|
||||||
client_addr.map(move |addr| (connec, addr))
|
|
||||||
})
|
|
||||||
.map(move |(connec, original_addr)| {
|
|
||||||
debug!("Successfully incoming connection from {}", original_addr);
|
|
||||||
let info = connec.info.shared();
|
|
||||||
let out = PeerIdTransportOutput {
|
|
||||||
socket: connec.socket,
|
|
||||||
info: Box::new(info.clone()
|
|
||||||
.map(move |info| (*info).clone())
|
|
||||||
.map_err(move |err| { let k = err.kind(); IoError::new(k, err) })),
|
|
||||||
original_addr: original_addr.clone(),
|
|
||||||
};
|
|
||||||
let real_addr = Box::new(info
|
|
||||||
.map_err(move |err| { let k = err.kind(); IoError::new(k, err) })
|
|
||||||
.map(move |info| {
|
|
||||||
let peer_id = info.info.public_key.clone().into_peer_id();
|
|
||||||
debug!("Identified {} as {:?}", original_addr, peer_id);
|
|
||||||
Protocol::P2p(peer_id.into()).into()
|
|
||||||
})) as Box<Future<Item = _, Error = _> + Send>;
|
|
||||||
(out, real_addr)
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok((Box::new(listener) as Box<_>, listened_addr))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
|
||||||
match multiaddr_to_peerid(addr.clone()) {
|
|
||||||
Ok(peer_id) => {
|
|
||||||
// If the multiaddress is a peer ID, try each known multiaddress (taken from the
|
|
||||||
// address resolved) one by one.
|
|
||||||
let addrs = {
|
|
||||||
let resolver = &self.addr_resolver;
|
|
||||||
resolver(peer_id.clone()).into_iter()
|
|
||||||
};
|
|
||||||
|
|
||||||
trace!("Try dialing peer ID {:?} ; loading multiaddrs from addr resolver", peer_id);
|
|
||||||
|
|
||||||
let transport = self.transport;
|
|
||||||
let future = stream::iter_ok(addrs)
|
|
||||||
// Try to dial each address through the transport.
|
|
||||||
.filter_map(move |addr| {
|
|
||||||
match transport.clone().dial(addr) {
|
|
||||||
Ok(dial) => Some(dial),
|
|
||||||
Err((_, addr)) => {
|
|
||||||
debug!("Address {} not supported by underlying transport", addr);
|
|
||||||
None
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.and_then(move |dial| dial)
|
|
||||||
// Pick the first non-failing dial result by filtering out the ones which fail.
|
|
||||||
.then(|res| Ok(res))
|
|
||||||
.filter_map(|res| res.ok())
|
|
||||||
.into_future()
|
|
||||||
.map_err(|(err, _)| err)
|
|
||||||
.and_then(move |(connec, _)| {
|
|
||||||
match connec {
|
|
||||||
Some(connec) => Ok((connec, peer_id)),
|
|
||||||
None => {
|
|
||||||
debug!("All multiaddresses failed when dialing peer {:?}", peer_id);
|
|
||||||
Err(IoError::new(IoErrorKind::Other, "couldn't find any multiaddress for peer"))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.and_then(move |((connec, original_addr), peer_id)| {
|
|
||||||
original_addr.map(move |addr| (connec, addr, peer_id))
|
|
||||||
})
|
|
||||||
.and_then(move |(connec, original_addr, peer_id)| {
|
|
||||||
debug!("Successfully dialed peer {:?} through {}", peer_id, original_addr);
|
|
||||||
let out = PeerIdTransportOutput {
|
|
||||||
socket: connec.socket,
|
|
||||||
info: connec.info,
|
|
||||||
original_addr: original_addr,
|
|
||||||
};
|
|
||||||
// Replace the multiaddress with the one of the form `/p2p/...` or `/ipfs/...`.
|
|
||||||
Ok((out, Box::new(future::ok(addr)) as Box<Future<Item = _, Error = _> + Send>))
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Box::new(future) as Box<_>)
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(addr) => {
|
|
||||||
// If the multiaddress is something else, propagate it to the underlying transport.
|
|
||||||
trace!("Propagating {} to the underlying transport", addr);
|
|
||||||
let dial = match self.transport.dial(addr) {
|
|
||||||
Ok(d) => d,
|
|
||||||
Err((inner, addr)) => {
|
|
||||||
let id = PeerIdTransport {
|
|
||||||
transport: inner,
|
|
||||||
addr_resolver: self.addr_resolver,
|
|
||||||
};
|
|
||||||
return Err((id, addr));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let future = dial
|
|
||||||
.and_then(move |(connec, original_addr)| {
|
|
||||||
original_addr.map(move |addr| (connec, addr))
|
|
||||||
})
|
|
||||||
.map(move |(connec, original_addr)| {
|
|
||||||
debug!("Successfully dialed {}", original_addr);
|
|
||||||
let info = connec.info.shared();
|
|
||||||
let out = PeerIdTransportOutput {
|
|
||||||
socket: connec.socket,
|
|
||||||
info: Box::new(info.clone()
|
|
||||||
.map(move |info| (*info).clone())
|
|
||||||
.map_err(move |err| { let k = err.kind(); IoError::new(k, err) })),
|
|
||||||
original_addr: original_addr.clone(),
|
|
||||||
};
|
|
||||||
let real_addr = Box::new(info
|
|
||||||
.map_err(move |err| { let k = err.kind(); IoError::new(k, err) })
|
|
||||||
.map(move |info| {
|
|
||||||
let peer_id = info.info.public_key.clone().into_peer_id();
|
|
||||||
debug!("Identified {} as {:?}", original_addr, peer_id);
|
|
||||||
Protocol::P2p(peer_id.into()).into()
|
|
||||||
})) as Box<Future<Item = _, Error = _> + Send>;
|
|
||||||
(out, real_addr)
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Box::new(future) as Box<_>)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
|
|
||||||
self.transport.nat_traversal(a, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Trans, AddrRes, AddrResOut> MuxedTransport for PeerIdTransport<Trans, AddrRes>
|
|
||||||
where
|
|
||||||
Trans: MuxedTransport + Clone + Send + 'static,
|
|
||||||
Trans::Dial: Send,
|
|
||||||
Trans::Listener: Send,
|
|
||||||
Trans::ListenerUpgrade: Send,
|
|
||||||
Trans::MultiaddrFuture: Send,
|
|
||||||
Trans::Output: AsyncRead + AsyncWrite + Send,
|
|
||||||
Trans::Incoming: Send,
|
|
||||||
Trans::IncomingUpgrade: Send,
|
|
||||||
AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :(
|
|
||||||
AddrResOut: IntoIterator<Item = Multiaddr> + 'static, // TODO: 'static :(
|
|
||||||
AddrResOut::IntoIter: Send,
|
|
||||||
{
|
|
||||||
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
|
|
||||||
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn next_incoming(self) -> Self::Incoming {
|
|
||||||
let future = self.transport.next_incoming().map(move |incoming| {
|
|
||||||
let future = incoming
|
|
||||||
.and_then(move |(connec, original_addr)| {
|
|
||||||
original_addr.map(move |addr| (connec, addr))
|
|
||||||
})
|
|
||||||
.map(move |(connec, original_addr)| {
|
|
||||||
debug!("Successful incoming substream from {}", original_addr);
|
|
||||||
let info = connec.info.shared();
|
|
||||||
let out = PeerIdTransportOutput {
|
|
||||||
socket: connec.socket,
|
|
||||||
info: Box::new(info.clone()
|
|
||||||
.map(move |info| (*info).clone())
|
|
||||||
.map_err(move |err| { let k = err.kind(); IoError::new(k, err) })),
|
|
||||||
original_addr: original_addr.clone(),
|
|
||||||
};
|
|
||||||
let real_addr = Box::new(info
|
|
||||||
.map_err(move |err| { let k = err.kind(); IoError::new(k, err) })
|
|
||||||
.map(move |info| {
|
|
||||||
let peer_id = info.info.public_key.clone().into_peer_id();
|
|
||||||
debug!("Identified {} as {:?}", original_addr, peer_id);
|
|
||||||
Protocol::P2p(peer_id.into()).into()
|
|
||||||
})) as Box<Future<Item = _, Error = _> + Send>;
|
|
||||||
(out, real_addr)
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(future) as Box<_>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Output of the identify transport.
|
|
||||||
pub struct PeerIdTransportOutput<S> {
|
|
||||||
/// The socket to communicate with the remote.
|
|
||||||
pub socket: S,
|
|
||||||
|
|
||||||
/// Identification of the remote.
|
|
||||||
/// This may not be known immediately, hence why we use a future.
|
|
||||||
pub info: Box<Future<Item = IdentifyTransportOutcome, Error = IoError> + Send>,
|
|
||||||
|
|
||||||
/// Original address of the remote.
|
|
||||||
/// This layer turns the address of the remote into the `/p2p/...` form, but stores the
|
|
||||||
/// original address in this field.
|
|
||||||
pub original_addr: Multiaddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the multiaddress is in the form `/p2p/...`, turn it into a `PeerId`.
|
|
||||||
// Otherwise, return it as-is.
|
|
||||||
fn multiaddr_to_peerid(addr: Multiaddr) -> Result<PeerId, Multiaddr> {
|
|
||||||
if addr.iter().next().is_none() {
|
|
||||||
return Err(addr)
|
|
||||||
}
|
|
||||||
match addr.iter().last() {
|
|
||||||
Some(Protocol::P2p(ref peer_id)) => {
|
|
||||||
match PeerId::from_multihash(peer_id.clone()) {
|
|
||||||
Ok(peer_id) => Ok(peer_id),
|
|
||||||
Err(_) => Err(addr),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => Err(addr),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
extern crate libp2p_tcp_transport;
|
|
||||||
extern crate tokio_current_thread;
|
|
||||||
|
|
||||||
use self::libp2p_tcp_transport::TcpConfig;
|
|
||||||
use PeerIdTransport;
|
|
||||||
use futures::{Future, Stream};
|
|
||||||
use libp2p_core::{Transport, PeerId, PublicKey};
|
|
||||||
use multiaddr::{Protocol, Multiaddr};
|
|
||||||
use std::io::Error as IoError;
|
|
||||||
use std::iter;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn dial_peer_id() {
|
|
||||||
// When we dial an `/p2p/...` address, the `PeerIdTransport` should look into the
|
|
||||||
// peerstore and dial one of the known multiaddresses of the node instead.
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct UnderlyingTrans {
|
|
||||||
inner: TcpConfig,
|
|
||||||
}
|
|
||||||
impl Transport for UnderlyingTrans {
|
|
||||||
type Output = <TcpConfig as Transport>::Output;
|
|
||||||
type MultiaddrFuture = <TcpConfig as Transport>::MultiaddrFuture;
|
|
||||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
|
|
||||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
|
|
||||||
type Dial = <TcpConfig as Transport>::Dial;
|
|
||||||
#[inline]
|
|
||||||
fn listen_on(
|
|
||||||
self,
|
|
||||||
_: Multiaddr,
|
|
||||||
) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
|
||||||
assert_eq!(
|
|
||||||
addr,
|
|
||||||
"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()
|
|
||||||
);
|
|
||||||
Ok(self.inner.dial(addr).unwrap_or_else(|_| panic!()))
|
|
||||||
}
|
|
||||||
#[inline]
|
|
||||||
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
|
|
||||||
self.inner.nat_traversal(a, b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let peer_id = PeerId::from_public_key(PublicKey::Ed25519(vec![1, 2, 3, 4]));
|
|
||||||
|
|
||||||
let underlying = UnderlyingTrans {
|
|
||||||
inner: TcpConfig::new(),
|
|
||||||
};
|
|
||||||
let transport = PeerIdTransport::new(underlying, {
|
|
||||||
let peer_id = peer_id.clone();
|
|
||||||
move |addr| {
|
|
||||||
assert_eq!(addr, peer_id);
|
|
||||||
vec!["/ip4/127.0.0.1/tcp/12345".parse().unwrap()]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let future = transport
|
|
||||||
.dial(iter::once(Protocol::P2p(peer_id.into())).collect())
|
|
||||||
.unwrap_or_else(|_| panic!())
|
|
||||||
.then::<_, Result<(), ()>>(|_| Ok(()));
|
|
||||||
|
|
||||||
let _ = tokio_current_thread::block_on_all(future).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user