From 1e223f627dba3e29b4313ba44f2afa608cc7b385 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 18 Oct 2018 15:21:14 +0100 Subject: [PATCH] Remove other unused files (#570) --- protocols/identify/src/identify_transport.rs | 333 ----------------- protocols/identify/src/peer_id_transport.rs | 370 ------------------- 2 files changed, 703 deletions(-) delete mode 100644 protocols/identify/src/identify_transport.rs delete mode 100644 protocols/identify/src/peer_id_transport.rs diff --git a/protocols/identify/src/identify_transport.rs b/protocols/identify/src/identify_transport.rs deleted file mode 100644 index dc039c0d..00000000 --- a/protocols/identify/src/identify_transport.rs +++ /dev/null @@ -1,333 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use fnv::FnvHashMap; -use futures::{future, Future, Stream}; -use libp2p_core::{Multiaddr, MuxedTransport, Transport}; -use parking_lot::Mutex; -use protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig}; -use std::collections::hash_map::Entry; -use std::error::Error; -use std::io::Error as IoError; -use std::sync::Arc; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// Implementation of `Transport`. See [the crate root description](index.html). -pub struct IdentifyTransport { - transport: Trans, - // Each entry is protected by an asynchronous mutex, so that if we dial the same node twice - // simultaneously, the second time will block until the first time has identified it. - cache: Arc>>, -} - -impl Clone for IdentifyTransport - where Trans: Clone, -{ - fn clone(&self) -> Self { - IdentifyTransport { - transport: self.transport.clone(), - cache: self.cache.clone(), - } - } -} - -type CacheEntry = future::Shared + Send>>; - -impl IdentifyTransport { - /// Creates an `IdentifyTransport` that wraps around the given transport and peerstore. - #[inline] - pub fn new(transport: Trans) -> Self { - IdentifyTransport { - transport, - cache: Arc::new(Mutex::new(Default::default())), - } - } -} - -impl Transport for IdentifyTransport -where - Trans: Transport + Clone + Send + 'static, // TODO: 'static :( - Trans::Dial: Send, - Trans::Listener: Send, - Trans::ListenerUpgrade: Send, - Trans::MultiaddrFuture: Send, - Trans::Output: AsyncRead + AsyncWrite + Send, -{ - type Output = IdentifyTransportOutput; - type MultiaddrFuture = future::FutureResult; - type Listener = Box + Send>; - type ListenerUpgrade = Box + Send>; - type Dial = Box + Send>; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.transport.clone().listen_on(addr.clone()) { - Ok((l, a)) => (l, a), - Err((inner, addr)) => { - let id = IdentifyTransport { - transport: inner, - cache: self.cache, - }; - return Err((id, addr)); - } - }; - - let identify_upgrade = self.transport.with_upgrade(IdentifyProtocolConfig); - let cache = self.cache.clone(); - - let listener = listener.map(move |connec| { - let identify_upgrade = identify_upgrade.clone(); - let cache = cache.clone(); - let fut = connec - .and_then(move |(connec, client_addr)| { - trace!("Incoming connection, waiting for client address"); - client_addr.map(move |addr| (connec, addr)) - }) - .and_then(move |(connec, client_addr)| { - debug!("Incoming connection from {}", client_addr); - - // Dial the address that connected to us and try upgrade with the - // identify protocol. - let info_future = cache_entry(&cache, client_addr.clone(), { let client_addr = client_addr.clone(); move || { - debug!("No cache entry for {}, dialing back in order to identify", client_addr); - future::lazy(|| { trace!("Starting identify back"); identify_upgrade - .dial(client_addr) - .unwrap_or_else(|(_, addr)| { - panic!("the multiaddr {} was determined to be valid earlier", addr) - }) }) - .map(move |(identify, _)| { - let (info, observed_addr) = match identify { - IdentifyOutput::RemoteInfo { info, observed_addr } => { - (info, observed_addr) - }, - _ => unreachable!( - "the identify protocol guarantees that we receive \ - remote information when we dial a node" - ), - }; - - debug!("Identified dialed back connection as pubkey {:?}", info.public_key); - IdentifyTransportOutcome { - info, - observed_addr, - } - }) - .map_err(move |err| { - debug!("Failed to identify dialed back connection"); - err - }) - }}); - - let out = IdentifyTransportOutput { - socket: connec, - info: Box::new(info_future), - }; - - Ok((out, future::ok(client_addr))) - }); - - Box::new(fut) as Box + Send> - }); - - Ok((Box::new(listener) as Box<_>, new_addr)) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - // We dial a first time the node. - let dial = match self.transport.clone().dial(addr) { - Ok(d) => d, - Err((transport, addr)) => { - let id = IdentifyTransport { - transport, - cache: self.cache, - }; - return Err((id, addr)); - } - }; - - // Once successfully dialed, we dial again to identify. - let identify_upgrade = self.transport.with_upgrade(IdentifyProtocolConfig); - let cache = self.cache.clone(); - let future = dial - .and_then(move |(connec, client_addr)| { - trace!("Dialing successful, waiting for client address"); - client_addr.map(move |addr| (connec, addr)) - }) - .and_then(move |(socket, addr)| { - trace!("Dialing successful ; client address is {}", addr); - let info_future = cache_entry(&cache, addr.clone(), { let addr = addr.clone(); move || { - trace!("No cache entry for {} ; dialing again for identification", addr); - future::lazy(|| { trace!("Starting identify back"); identify_upgrade - .dial(addr) - .unwrap_or_else(|(_, addr)| { - panic!("the multiaddr {} was determined to be valid earlier", addr) - }) }) - .map(move |(identify, _)| { - let (info, observed_addr) = match identify { - IdentifyOutput::RemoteInfo { info, observed_addr } => { - (info, observed_addr) - } - _ => unreachable!( - "the identify protocol guarantees that we receive \ - remote information when we dial a node" - ), - }; - - IdentifyTransportOutcome { - info, - observed_addr, - } - }) - }}); - - let out = IdentifyTransportOutput { - socket: socket, - info: Box::new(info_future), - }; - - Ok((out, future::ok(addr))) - }); - - Ok(Box::new(future) as Box<_>) - } - - #[inline] - fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { - self.transport.nat_traversal(a, b) - } -} - -impl MuxedTransport for IdentifyTransport -where - Trans: MuxedTransport + Clone + Send + 'static, - Trans::Dial: Send, - Trans::Listener: Send, - Trans::ListenerUpgrade: Send, - Trans::MultiaddrFuture: Send, - Trans::Output: AsyncRead + AsyncWrite + Send, - Trans::Incoming: Send, - Trans::IncomingUpgrade: Send, -{ - type Incoming = Box + Send>; - type IncomingUpgrade = Box + Send>; - - #[inline] - fn next_incoming(self) -> Self::Incoming { - let identify_upgrade = self.transport.clone().with_upgrade(IdentifyProtocolConfig); - let cache = self.cache.clone(); - - let future = self.transport.next_incoming().map(move |incoming| { - let cache = cache.clone(); - let future = incoming - .and_then(move |(connec, client_addr)| { - debug!("Incoming substream ; waiting for client address"); - client_addr.map(move |addr| (connec, addr)) - }) - .and_then(move |(connec, client_addr)| { - debug!("Incoming substream from {}", client_addr); - - // Dial the address that connected to us and try upgrade with the - // identify protocol. - let info_future = cache_entry(&cache, client_addr.clone(), { let client_addr = client_addr.clone(); move || { - debug!("No cache entry from {} ; dialing back to identify", client_addr); - future::lazy(|| { trace!("Starting identify back"); identify_upgrade - .dial(client_addr) - .unwrap_or_else(|(_, client_addr)| { - panic!("the multiaddr {} was determined to be valid earlier", client_addr) - }) }) - .map(move |(identify, _)| { - let (info, observed_addr) = match identify { - IdentifyOutput::RemoteInfo { info, observed_addr } => { - (info, observed_addr) - }, - _ => unreachable!( - "the identify protocol guarantees that we receive \ - remote information when we dial a node" - ), - }; - - debug!("Identified incoming substream as pubkey {:?}", info.public_key); - IdentifyTransportOutcome { - info, - observed_addr, - } - }) - .map_err(move |err| { - debug!("Failed to identify incoming substream"); - err - }) - }}); - - let out = IdentifyTransportOutput { - socket: connec, - info: Box::new(info_future), - }; - - Ok((out, future::ok(client_addr))) - }); - - Box::new(future) as Box + Send> - }); - - Box::new(future) as Box<_> - } -} - -/// Output of the identify transport. -pub struct IdentifyTransportOutput { - /// The socket to communicate with the remote. - pub socket: S, - /// Outcome of the identification of the remote. - pub info: Box + Send>, -} - -/// Outcome of the identification of the remote. -#[derive(Debug, Clone)] -pub struct IdentifyTransportOutcome { - /// Identification of the remote. - pub info: IdentifyInfo, - /// Address the remote sees for us. - pub observed_addr: Multiaddr, -} - -fn cache_entry(cache: &Mutex>, addr: Multiaddr, if_no_entry: F) - -> impl Future -where F: FnOnce() -> Fut, - Fut: Future + Send + 'static, -{ - trace!("Looking up cache entry for {}", addr); - let mut cache = cache.lock(); - match cache.entry(addr) { - Entry::Occupied(entry) => { - trace!("Cache entry found, cloning"); - future::Either::A(entry.get().clone()) - }, - - Entry::Vacant(entry) => { - trace!("No cache entry available"); - let future = (Box::new(if_no_entry()) as Box + Send>).shared(); - entry.insert(future.clone()); - future::Either::B(future) - }, - }.map(|out| (*out).clone()).map_err(|err| IoError::new(err.kind(), err.description())) -} - -// TODO: test that we receive back what the remote sent us diff --git a/protocols/identify/src/peer_id_transport.rs b/protocols/identify/src/peer_id_transport.rs deleted file mode 100644 index b810ac7d..00000000 --- a/protocols/identify/src/peer_id_transport.rs +++ /dev/null @@ -1,370 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::{future, stream, Future, Stream}; -use identify_transport::{IdentifyTransport, IdentifyTransportOutcome}; -use libp2p_core::{PeerId, MuxedTransport, Transport}; -use multiaddr::{Protocol, Multiaddr}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// Implementation of `Transport`. See [the crate root description](index.html). -#[derive(Clone)] -pub struct PeerIdTransport { - transport: IdentifyTransport, - addr_resolver: AddrRes, -} - -impl PeerIdTransport { - /// Creates an `PeerIdTransport` that wraps around the given transport and address resolver. - #[inline] - pub fn new(transport: Trans, addr_resolver: AddrRes) -> Self { - PeerIdTransport { - transport: IdentifyTransport::new(transport), - addr_resolver, - } - } -} - -impl Transport for PeerIdTransport -where - Trans: Transport + Clone + Send + 'static, // TODO: 'static :( - Trans::Dial: Send, - Trans::Listener: Send, - Trans::ListenerUpgrade: Send, - Trans::MultiaddrFuture: Send, - Trans::Output: AsyncRead + AsyncWrite + Send, - AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :( - AddrResOut: IntoIterator + 'static, // TODO: 'static :( - AddrResOut::IntoIter: Send, -{ - type Output = PeerIdTransportOutput; - type MultiaddrFuture = Box + Send>; - type Listener = Box + Send>; - type ListenerUpgrade = Box + Send>; - type Dial = Box + Send>; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - // Note that `listen_on` expects a "regular" multiaddr (eg. `/ip/.../tcp/...`), - // and not `/p2p/`. - - let (listener, listened_addr) = match self.transport.listen_on(addr) { - Ok((listener, addr)) => (listener, addr), - Err((inner, addr)) => { - let id = PeerIdTransport { - transport: inner, - addr_resolver: self.addr_resolver, - }; - - return Err((id, addr)); - } - }; - - let listener = listener.map(move |connec| { - let fut = connec - .and_then(move |(connec, client_addr)| { - client_addr.map(move |addr| (connec, addr)) - }) - .map(move |(connec, original_addr)| { - debug!("Successfully incoming connection from {}", original_addr); - let info = connec.info.shared(); - let out = PeerIdTransportOutput { - socket: connec.socket, - info: Box::new(info.clone() - .map(move |info| (*info).clone()) - .map_err(move |err| { let k = err.kind(); IoError::new(k, err) })), - original_addr: original_addr.clone(), - }; - let real_addr = Box::new(info - .map_err(move |err| { let k = err.kind(); IoError::new(k, err) }) - .map(move |info| { - let peer_id = info.info.public_key.clone().into_peer_id(); - debug!("Identified {} as {:?}", original_addr, peer_id); - Protocol::P2p(peer_id.into()).into() - })) as Box + Send>; - (out, real_addr) - }); - - Box::new(fut) as Box + Send> - }); - - Ok((Box::new(listener) as Box<_>, listened_addr)) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - match multiaddr_to_peerid(addr.clone()) { - Ok(peer_id) => { - // If the multiaddress is a peer ID, try each known multiaddress (taken from the - // address resolved) one by one. - let addrs = { - let resolver = &self.addr_resolver; - resolver(peer_id.clone()).into_iter() - }; - - trace!("Try dialing peer ID {:?} ; loading multiaddrs from addr resolver", peer_id); - - let transport = self.transport; - let future = stream::iter_ok(addrs) - // Try to dial each address through the transport. - .filter_map(move |addr| { - match transport.clone().dial(addr) { - Ok(dial) => Some(dial), - Err((_, addr)) => { - debug!("Address {} not supported by underlying transport", addr); - None - }, - } - }) - .and_then(move |dial| dial) - // Pick the first non-failing dial result by filtering out the ones which fail. - .then(|res| Ok(res)) - .filter_map(|res| res.ok()) - .into_future() - .map_err(|(err, _)| err) - .and_then(move |(connec, _)| { - match connec { - Some(connec) => Ok((connec, peer_id)), - None => { - debug!("All multiaddresses failed when dialing peer {:?}", peer_id); - Err(IoError::new(IoErrorKind::Other, "couldn't find any multiaddress for peer")) - }, - } - }) - .and_then(move |((connec, original_addr), peer_id)| { - original_addr.map(move |addr| (connec, addr, peer_id)) - }) - .and_then(move |(connec, original_addr, peer_id)| { - debug!("Successfully dialed peer {:?} through {}", peer_id, original_addr); - let out = PeerIdTransportOutput { - socket: connec.socket, - info: connec.info, - original_addr: original_addr, - }; - // Replace the multiaddress with the one of the form `/p2p/...` or `/ipfs/...`. - Ok((out, Box::new(future::ok(addr)) as Box + Send>)) - }); - - Ok(Box::new(future) as Box<_>) - } - - Err(addr) => { - // If the multiaddress is something else, propagate it to the underlying transport. - trace!("Propagating {} to the underlying transport", addr); - let dial = match self.transport.dial(addr) { - Ok(d) => d, - Err((inner, addr)) => { - let id = PeerIdTransport { - transport: inner, - addr_resolver: self.addr_resolver, - }; - return Err((id, addr)); - } - }; - - let future = dial - .and_then(move |(connec, original_addr)| { - original_addr.map(move |addr| (connec, addr)) - }) - .map(move |(connec, original_addr)| { - debug!("Successfully dialed {}", original_addr); - let info = connec.info.shared(); - let out = PeerIdTransportOutput { - socket: connec.socket, - info: Box::new(info.clone() - .map(move |info| (*info).clone()) - .map_err(move |err| { let k = err.kind(); IoError::new(k, err) })), - original_addr: original_addr.clone(), - }; - let real_addr = Box::new(info - .map_err(move |err| { let k = err.kind(); IoError::new(k, err) }) - .map(move |info| { - let peer_id = info.info.public_key.clone().into_peer_id(); - debug!("Identified {} as {:?}", original_addr, peer_id); - Protocol::P2p(peer_id.into()).into() - })) as Box + Send>; - (out, real_addr) - }); - - Ok(Box::new(future) as Box<_>) - } - } - } - - #[inline] - fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { - self.transport.nat_traversal(a, b) - } -} - -impl MuxedTransport for PeerIdTransport -where - Trans: MuxedTransport + Clone + Send + 'static, - Trans::Dial: Send, - Trans::Listener: Send, - Trans::ListenerUpgrade: Send, - Trans::MultiaddrFuture: Send, - Trans::Output: AsyncRead + AsyncWrite + Send, - Trans::Incoming: Send, - Trans::IncomingUpgrade: Send, - AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :( - AddrResOut: IntoIterator + 'static, // TODO: 'static :( - AddrResOut::IntoIter: Send, -{ - type Incoming = Box + Send>; - type IncomingUpgrade = Box + Send>; - - #[inline] - fn next_incoming(self) -> Self::Incoming { - let future = self.transport.next_incoming().map(move |incoming| { - let future = incoming - .and_then(move |(connec, original_addr)| { - original_addr.map(move |addr| (connec, addr)) - }) - .map(move |(connec, original_addr)| { - debug!("Successful incoming substream from {}", original_addr); - let info = connec.info.shared(); - let out = PeerIdTransportOutput { - socket: connec.socket, - info: Box::new(info.clone() - .map(move |info| (*info).clone()) - .map_err(move |err| { let k = err.kind(); IoError::new(k, err) })), - original_addr: original_addr.clone(), - }; - let real_addr = Box::new(info - .map_err(move |err| { let k = err.kind(); IoError::new(k, err) }) - .map(move |info| { - let peer_id = info.info.public_key.clone().into_peer_id(); - debug!("Identified {} as {:?}", original_addr, peer_id); - Protocol::P2p(peer_id.into()).into() - })) as Box + Send>; - (out, real_addr) - }); - - Box::new(future) as Box + Send> - }); - - Box::new(future) as Box<_> - } -} - -/// Output of the identify transport. -pub struct PeerIdTransportOutput { - /// The socket to communicate with the remote. - pub socket: S, - - /// Identification of the remote. - /// This may not be known immediately, hence why we use a future. - pub info: Box + Send>, - - /// Original address of the remote. - /// This layer turns the address of the remote into the `/p2p/...` form, but stores the - /// original address in this field. - pub original_addr: Multiaddr, -} - -// If the multiaddress is in the form `/p2p/...`, turn it into a `PeerId`. -// Otherwise, return it as-is. -fn multiaddr_to_peerid(addr: Multiaddr) -> Result { - if addr.iter().next().is_none() { - return Err(addr) - } - match addr.iter().last() { - Some(Protocol::P2p(ref peer_id)) => { - match PeerId::from_multihash(peer_id.clone()) { - Ok(peer_id) => Ok(peer_id), - Err(_) => Err(addr), - } - } - _ => Err(addr), - } -} - -#[cfg(test)] -mod tests { - extern crate libp2p_tcp_transport; - extern crate tokio_current_thread; - - use self::libp2p_tcp_transport::TcpConfig; - use PeerIdTransport; - use futures::{Future, Stream}; - use libp2p_core::{Transport, PeerId, PublicKey}; - use multiaddr::{Protocol, Multiaddr}; - use std::io::Error as IoError; - use std::iter; - - #[test] - fn dial_peer_id() { - // When we dial an `/p2p/...` address, the `PeerIdTransport` should look into the - // peerstore and dial one of the known multiaddresses of the node instead. - - #[derive(Debug, Clone)] - struct UnderlyingTrans { - inner: TcpConfig, - } - impl Transport for UnderlyingTrans { - type Output = ::Output; - type MultiaddrFuture = ::MultiaddrFuture; - type Listener = Box + Send>; - type ListenerUpgrade = Box + Send>; - type Dial = ::Dial; - #[inline] - fn listen_on( - self, - _: Multiaddr, - ) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - unreachable!() - } - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - assert_eq!( - addr, - "/ip4/127.0.0.1/tcp/12345".parse::().unwrap() - ); - Ok(self.inner.dial(addr).unwrap_or_else(|_| panic!())) - } - #[inline] - fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { - self.inner.nat_traversal(a, b) - } - } - - let peer_id = PeerId::from_public_key(PublicKey::Ed25519(vec![1, 2, 3, 4])); - - let underlying = UnderlyingTrans { - inner: TcpConfig::new(), - }; - let transport = PeerIdTransport::new(underlying, { - let peer_id = peer_id.clone(); - move |addr| { - assert_eq!(addr, peer_id); - vec!["/ip4/127.0.0.1/tcp/12345".parse().unwrap()] - } - }); - - let future = transport - .dial(iter::once(Protocol::P2p(peer_id.into())).collect()) - .unwrap_or_else(|_| panic!()) - .then::<_, Result<(), ()>>(|_| Ok(())); - - let _ = tokio_current_thread::block_on_all(future).unwrap(); - } -}