diff --git a/core/src/transport/denied.rs b/core/src/transport/denied.rs deleted file mode 100644 index 87a0beb4..00000000 --- a/core/src/transport/denied.rs +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::prelude::*; -use multiaddr::Multiaddr; -use std::io::{self, Cursor}; -use transport::Transport; - -/// Dummy implementation of `Transport` that just denies every single attempt. -#[derive(Debug, Copy, Clone)] -pub struct DeniedTransport; - -impl Transport for DeniedTransport { - // TODO: could use `!` for associated types once stable - type Output = Cursor>; - type Listener = Box + Send + Sync>; - type ListenerUpgrade = Box + Send + Sync>; - type Dial = Box + Send + Sync>; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - Err((DeniedTransport, addr)) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - Err((DeniedTransport, addr)) - } - - #[inline] - fn nat_traversal(&self, _: &Multiaddr, _: &Multiaddr) -> Option { - None - } -} diff --git a/core/src/transport/interruptible.rs b/core/src/transport/interruptible.rs deleted file mode 100644 index db24e995..00000000 --- a/core/src/transport/interruptible.rs +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::{future, prelude::*, sync::oneshot}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use transport::Transport; -use Multiaddr; - -/// See `Transport::interruptible`. -#[derive(Debug, Clone)] -pub struct Interruptible { - transport: T, - rx: future::Shared>, -} - -impl Interruptible { - /// Internal function that builds an `Interruptible`. - #[inline] - pub(crate) fn new(transport: T) -> (Interruptible, Interrupt) { - let (_tx, rx) = oneshot::channel(); - let transport = Interruptible { transport, rx: rx.shared() }; - let int = Interrupt { _tx }; - (transport, int) - } -} - -impl Transport for Interruptible -where - T: Transport, -{ - type Output = T::Output; - type Listener = T::Listener; - type ListenerUpgrade = T::ListenerUpgrade; - type Dial = InterruptibleDial; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - match self.transport.listen_on(addr) { - Ok(val) => Ok(val), - Err((transport, addr)) => Err((Interruptible { transport, rx: self.rx }, addr)), - } - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - match self.transport.dial(addr) { - Ok(future) => { - Ok(InterruptibleDial { - inner: future, - rx: self.rx, - }) - } - Err((transport, addr)) => Err((Interruptible { transport, rx: self.rx }, addr)), - } - } - - #[inline] - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.transport.nat_traversal(server, observed) - } -} - -/// Dropping this object interrupts the dialing of the corresponding `Interruptible`. -pub struct Interrupt { - _tx: oneshot::Sender<()>, -} - -#[must_use = "futures do nothing unless polled"] -pub struct InterruptibleDial { - inner: F, - rx: future::Shared>, -} - -impl Future for InterruptibleDial - where F: Future -{ - type Item = F::Item; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::Ready(_)) | Err(_) => { - return Err(IoError::new(IoErrorKind::ConnectionAborted, "connection interrupted")); - }, - Ok(Async::NotReady) => (), - }; - - self.inner.poll() - } -} diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 04dbff8a..e3b4c7f3 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -29,18 +29,15 @@ //! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades //! together in a complex chain of protocols negotiation. -use crate::{InboundUpgrade, OutboundUpgrade, Endpoint}; +use crate::{InboundUpgrade, OutboundUpgrade, Endpoint, nodes::raw_swarm::ConnectedPoint}; use futures::prelude::*; use multiaddr::Multiaddr; -use nodes::raw_swarm::ConnectedPoint; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; pub mod and_then; pub mod boxed; pub mod choice; -pub mod denied; -pub mod interruptible; pub mod map; pub mod map_err; pub mod map_err_dial; @@ -48,7 +45,6 @@ pub mod memory; pub mod upgrade; pub use self::choice::OrTransport; -pub use self::denied::DeniedTransport; pub use self::memory::connector; pub use self::upgrade::Upgrade; @@ -202,13 +198,4 @@ pub trait Transport { { and_then::and_then(self, upgrade) } - - /// Wraps around the `Transport` and makes it interruptible. - #[inline] - fn interruptible(self) -> (interruptible::Interruptible, interruptible::Interrupt) - where - Self: Sized, - { - interruptible::Interruptible::new(self) - } } diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index d1d748b9..2a71e94d 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -20,7 +20,7 @@ use bytes::Bytes; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use futures::future::FutureResult; +use futures::future; use std::iter; use void::{unreachable, Void}; @@ -41,7 +41,7 @@ impl UpgradeInfo for DeniedUpgrade { impl InboundUpgrade for DeniedUpgrade { type Output = Void; type Error = Void; - type Future = FutureResult; + type Future = future::Empty; fn upgrade_inbound(self, _: C, id: Self::UpgradeId) -> Self::Future { unreachable(id) @@ -51,7 +51,7 @@ impl InboundUpgrade for DeniedUpgrade { impl OutboundUpgrade for DeniedUpgrade { type Output = Void; type Error = Void; - type Future = FutureResult; + type Future = future::Empty; fn upgrade_outbound(self, _: C, id: Self::UpgradeId) -> Self::Future { unreachable(id) diff --git a/transports/relay/src/transport.rs b/transports/relay/src/transport.rs index 27f8a3e6..f8ff605c 100644 --- a/transports/relay/src/transport.rs +++ b/transports/relay/src/transport.rs @@ -24,7 +24,7 @@ use crate::{ protocol, utility::{Peer, RelayAddr} }; -use futures::{stream, prelude::*}; +use futures::{future, stream, prelude::*}; use libp2p_core::{transport::Transport, upgrade::apply_outbound}; use log::{debug, info, trace}; use multiaddr::Multiaddr; @@ -45,16 +45,14 @@ impl Transport for RelayTransport where T: Transport + Send + Clone + 'static, T::Dial: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, T::Output: AsyncRead + AsyncWrite + Send, P: Deref + Clone + 'static, S: 'static, for<'a> &'a S: Peerstore { type Output = T::Output; - type Listener = Box + Send>; - type ListenerUpgrade = Box + Send>; + type Listener = stream::Empty<(Self::ListenerUpgrade, Multiaddr), io::Error>; + type ListenerUpgrade = future::Empty; type Dial = Box + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -92,8 +90,6 @@ impl RelayTransport where T: Transport + Clone + 'static, T::Dial: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, T::Output: AsyncRead + AsyncWrite + Send, P: Deref + Clone + 'static, for<'a> &'a S: Peerstore diff --git a/transports/websocket/src/browser.rs b/transports/websocket/src/browser.rs index d646c8a2..1d591eef 100644 --- a/transports/websocket/src/browser.rs +++ b/transports/websocket/src/browser.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use futures::{future, stream}; use futures::stream::Then as StreamThen; use futures::sync::{mpsc, oneshot}; use futures::{Async, Future, Poll, Stream}; @@ -25,7 +26,6 @@ use multiaddr::{Protocol, Multiaddr}; use rw_stream_sink::RwStreamSink; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Read, Write}; -use std::iter; use std::sync::{Arc, Mutex}; use stdweb::web::TypedArray; use stdweb::{self, Reference}; @@ -53,8 +53,8 @@ impl BrowserWsConfig { impl Transport for BrowserWsConfig { type Output = BrowserWsConn; - type Listener = Box + Send>; // TODO: use `!` - type ListenerUpgrade = Box + Send>; // TODO: use `!` + type Listener = stream::Empty<(Self::ListenerUpgrade, Multiaddr), IoError>; + type ListenerUpgrade = future::Empty; type Dial = Box + Send>; #[inline]