diff --git a/core/src/swarm.rs b/core/src/swarm.rs deleted file mode 100644 index f7d95ee5..00000000 --- a/core/src/swarm.rs +++ /dev/null @@ -1,569 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::stream::StreamFuture; -use futures::sync::oneshot; -use futures::{Async, Future, IntoFuture, Poll, Stream}; -use futures::task; -use parking_lot::Mutex; -use std::fmt; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::Arc; -use {Multiaddr, MuxedTransport, Transport}; - -/// Creates a swarm. -/// -/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a -/// `Future` that produces a `()`. -/// -/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to -/// control, and the `Future` must be driven to completion in order for things to work. -pub fn swarm( - transport: T, - handler: H, -) -> (SwarmController, SwarmEvents) -where - T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ - H: FnMut(T::Output, Box + Send>) -> F, - F: IntoFuture, -{ - let shared = Arc::new(Mutex::new(Shared { - next_incoming: transport.clone().next_incoming(), - listeners: Vec::new(), - listeners_upgrade: Vec::new(), - dialers: Vec::new(), - to_process: Vec::new(), - task_to_notify: None, - })); - - let future = SwarmEvents { - transport: transport.clone(), - shared: shared.clone(), - handler: handler, - }; - - let controller = SwarmController { - transport, - shared, - }; - - (controller, future) -} - -/// Allows control of what the swarm is doing. -pub struct SwarmController -where - T: MuxedTransport + 'static, // TODO: 'static :-/ -{ - /// Shared between the swarm infrastructure. - shared: Arc>>, - - /// Transport used to dial or listen. - transport: T, -} - -impl fmt::Debug for SwarmController -where - T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/ -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_tuple("SwarmController") - .field(&self.transport) - .finish() - } -} - -impl Clone for SwarmController -where - T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ -{ - fn clone(&self) -> Self { - SwarmController { - transport: self.transport.clone(), - shared: self.shared.clone(), - } - } -} - -impl SwarmController -where - T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ - T::Dial: Send, - T::MultiaddrFuture: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, - T::Output: Send, - F: 'static, -{ - /// Asks the swarm to dial the node with the given multiaddress. The connection is then - /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when - /// calling `swarm`. - /// - /// Returns a future that is signalled once the closure in the `swarm` has returned its future. - /// Therefore if the closure in the swarm has some side effect (eg. write something in a - /// variable), this side effect will be observable when this future succeeds. - #[inline] - pub fn dial(&self, multiaddr: Multiaddr, transport: Du) - -> Result, Multiaddr> - where - Du: Transport + 'static, // TODO: 'static :-/ - Du::Dial: Send, - Du::MultiaddrFuture: Send, - Du::Output: Into, - { - self.dial_then(multiaddr, transport, |v| v) - } - - /// Internal version of `dial` that allows adding a closure that is called after either the - /// dialing fails or the handler has been called with the resulting future. - /// - /// The returned future is filled with the output of `then`. - pub(crate) fn dial_then(&self, multiaddr: Multiaddr, transport: Du, then: TThen) - -> Result, Multiaddr> - where - Du: Transport + 'static, // TODO: 'static :-/ - Du::Dial: Send, - Du::MultiaddrFuture: Send, - Du::Output: Into, - TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + Send + 'static, - { - trace!("Swarm dialing {}", multiaddr); - - match transport.dial(multiaddr.clone()) { - Ok(dial) => { - let (tx, rx) = oneshot::channel(); - let mut then = Some(move |val| { - let _ = tx.send(then(val)); - }); - // Unfortunately the `Box` type is still unusable in Rust right now, - // so we use a `Box` instead and panic if it is called multiple times. - let mut then = Box::new(move |val: Result<(), IoError>| { - let then = then.take().expect("The Boxed FnMut should only be called once"); - then(val); - }) as Box; - - let dial = dial.then(|result| { - match result { - Ok((output, client_addr)) => { - let client_addr = Box::new(client_addr) as Box + Send>; - Ok((output.into(), then, client_addr)) - } - Err(err) => { - debug!("Error in dialer upgrade: {:?}", err); - let err_clone = IoError::new(err.kind(), err.to_string()); - then(Err(err)); - Err(err_clone) - } - } - }); - - let mut shared = self.shared.lock(); - shared.dialers.push((multiaddr, Box::new(dial) as Box<_>)); - if let Some(task) = shared.task_to_notify.take() { - task.notify(); - } - - Ok(rx.then(|result| { - match result { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err), - Err(_) => Err(IoError::new(IoErrorKind::ConnectionAborted, - "dial cancelled the swarm future has been destroyed")), - } - })) - } - Err((_, multiaddr)) => Err(multiaddr), - } - } - - /// Interrupts all dialing attempts to a specific multiaddress. - /// - /// Has no effect if the dialing attempt has already succeeded, in which case it will be - /// dispatched to the handler. - pub fn interrupt_dial(&self, multiaddr: &Multiaddr) { - let mut shared = self.shared.lock(); - shared.dialers.retain(|dialer| { - &dialer.0 != multiaddr - }); - } - - /// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that - /// was passed to `swarm`. - // TODO: add a way to cancel a listener - pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { - match self.transport.clone().listen_on(multiaddr) { - Ok((listener, new_addr)) => { - trace!("Swarm listening on {}", new_addr); - let mut shared = self.shared.lock(); - let listener = Box::new( - listener.map(|f| { - let f = f.map(|(out, maf)| { - (out, Box::new(maf) as Box + Send>) - }); - - Box::new(f) as Box + Send> - }), - ) as Box + Send>; - shared.listeners.push((new_addr.clone(), listener.into_future())); - if let Some(task) = shared.task_to_notify.take() { - task.notify(); - } - Ok(new_addr) - } - Err((_, multiaddr)) => Err(multiaddr), - } - } -} - -/// Future that must be driven to completion in order for the swarm to work. -#[must_use = "futures do nothing unless polled"] -pub struct SwarmEvents -where - T: MuxedTransport + 'static, // TODO: 'static :-/ -{ - /// Shared between the swarm infrastructure. - shared: Arc>>, - - /// The transport used to dial. - transport: T, - - /// Swarm handler. - handler: H, -} - -impl Stream for SwarmEvents -where - T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, - T::MultiaddrFuture: Send, - T::IncomingUpgrade: Send, - H: FnMut(T::Output, Box + Send>) -> If, - If: IntoFuture, - F: Future + 'static, // TODO: 'static :-/ -{ - type Item = SwarmEvent; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - let mut shared = self.shared.lock(); - let handler = &mut self.handler; - - loop { - match shared.next_incoming.poll() { - Ok(Async::Ready(connec)) => { - debug!("Swarm received new multiplexed incoming connection"); - shared.next_incoming = self.transport.clone().next_incoming(); - let connec = connec.map(|(out, maf)| { - (out, Box::new(maf) as Box + Send>) - }); - shared.listeners_upgrade.push(Box::new(connec) as Box<_>); - } - Ok(Async::NotReady) => break, - Err(err) => { - // TODO: should that stop everything? - debug!("Error in multiplexed incoming connection: {:?}", err); - shared.next_incoming = self.transport.clone().next_incoming(); - return Ok(Async::Ready(Some(SwarmEvent::IncomingError(err)))); - } - } - } - - // We remove each element from `shared.listeners` one by one and add them back only - // if relevant. - for n in (0 .. shared.listeners.len()).rev() { - let (listen_addr, mut listener) = shared.listeners.swap_remove(n); - loop { - match listener.poll() { - Ok(Async::Ready((Some(upgrade), remaining))) => { - trace!("Swarm received new connection on listener socket"); - shared.listeners_upgrade.push(upgrade); - listener = remaining.into_future(); - } - Ok(Async::Ready((None, _))) => { - debug!("Listener closed gracefully"); - return Ok(Async::Ready(Some(SwarmEvent::ListenerClosed { - listen_addr - }))); - }, - Err((error, _)) => { - debug!("Error in listener: {:?}", error); - return Ok(Async::Ready(Some(SwarmEvent::ListenerError { - listen_addr, - error, - }))); - } - Ok(Async::NotReady) => { - shared.listeners.push((listen_addr, listener)); - break; - } - } - } - } - - // We remove each element from `shared.listeners_upgrade` one by one and add them back - // only if relevant. - for n in (0 .. shared.listeners_upgrade.len()).rev() { - let mut listener_upgrade = shared.listeners_upgrade.swap_remove(n); - match listener_upgrade.poll() { - Ok(Async::Ready((output, client_addr))) => { - debug!("Successfully upgraded incoming connection"); - // TODO: unlock mutex before calling handler, in order to avoid deadlocks if - // the user does something stupid - shared.to_process.push(handler(output, client_addr).into_future()); - } - Err(err) => { - debug!("Error in listener upgrade: {:?}", err); - return Ok(Async::Ready(Some(SwarmEvent::ListenerUpgradeError(err)))); - } - Ok(Async::NotReady) => { - shared.listeners_upgrade.push(listener_upgrade); - }, - } - } - - // We remove each element from `shared.dialers` one by one and add them back only - // if relevant. - for n in (0 .. shared.dialers.len()).rev() { - let (client_addr, mut dialer) = shared.dialers.swap_remove(n); - match dialer.poll() { - Ok(Async::Ready((output, mut notifier, addr))) => { - trace!("Successfully upgraded dialed connection"); - // TODO: unlock mutex before calling handler, in order to avoid deadlocks if - // the user does something stupid - shared.to_process.push(handler(output, addr).into_future()); - notifier(Ok(())); - } - Err(error) => { - return Ok(Async::Ready(Some(SwarmEvent::DialFailed { - client_addr, - error, - }))); - }, - Ok(Async::NotReady) => { - shared.dialers.push((client_addr, dialer)); - }, - } - } - - // We remove each element from `shared.to_process` one by one and add them back only - // if relevant. - for n in (0 .. shared.to_process.len()).rev() { - let mut to_process = shared.to_process.swap_remove(n); - match to_process.poll() { - Ok(Async::Ready(())) => { - trace!("Future returned by swarm handler driven to completion"); - return Ok(Async::Ready(Some(SwarmEvent::HandlerFinished { - handler_future: to_process, - }))); - } - Err(error) => { - debug!("Error in processing: {:?}", error); - return Ok(Async::Ready(Some(SwarmEvent::HandlerError { - handler_future: to_process, - error, - }))); - } - Ok(Async::NotReady) => { - shared.to_process.push(to_process); - } - } - } - - // TODO: we never return `Ok(Ready)` because there's no way to know whether - // `next_incoming()` can produce anything more in the future ; also we would need to - // know when the controller has been dropped - shared.task_to_notify = Some(task::current()); - Ok(Async::NotReady) - } -} - -// TODO: stronger typing -struct Shared where T: MuxedTransport + 'static { - /// Next incoming substream on the transport. - next_incoming: T::Incoming, - - /// All the active listeners. - listeners: Vec<( - Multiaddr, - StreamFuture< - Box< - Stream< - Item = Box + Send>), Error = IoError> + Send>, - Error = IoError, - > + Send, - >, - >, - )>, - - /// Futures that upgrade an incoming listening connection to a full connection. - listeners_upgrade: - Vec + Send>), Error = IoError> + Send>>, - - /// Futures that dial a remote address. - /// - /// Contains the address we dial, so that we can cancel it if necessary. - dialers: Vec<(Multiaddr, Box) + Send>, Box + Send>), Error = IoError> + Send>)>, - - /// List of futures produced by the swarm closure. Must be processed to the end. - to_process: Vec, - - /// The task to notify whenever we add a new element in one of the lists. - /// Necessary so that the task wakes up and the element gets polled. - task_to_notify: Option, -} - -/// Event that happens in the swarm. -#[derive(Debug)] -pub enum SwarmEvent { - /// An error has happened while polling the muxed transport for incoming connections. - IncomingError(IoError), - - /// A listener has gracefully closed. - ListenerClosed { - /// Address the listener was listening on. - listen_addr: Multiaddr, - }, - - /// A listener has stopped because it produced an error. - ListenerError { - /// Address the listener was listening on. - listen_addr: Multiaddr, - /// The error that happened. - error: IoError, - }, - - /// An error happened while upgrading an incoming connection. - ListenerUpgradeError(IoError), - - /// Failed to dial a remote address. - DialFailed { - /// Address we were trying to dial. - client_addr: Multiaddr, - /// Error that happened. - error: IoError, - }, - - /// A future returned by the handler has finished. - HandlerFinished { - /// The future originally returned by the handler. - handler_future: F, - }, - - /// A future returned by the handler has produced an error. - HandlerError { - /// The future originally returned by the handler. - handler_future: F, - /// The error that happened. - error: IoError, - }, -} - -#[cfg(test)] -mod tests { - use futures::{Future, Stream, future}; - use rand; - use transport::{self, DeniedTransport, Transport}; - use std::io::Error as IoError; - use std::sync::{atomic, Arc}; - use swarm; - use tokio::runtime::current_thread; - - #[test] - fn transport_error_propagation_listen() { - let (swarm_ctrl, _swarm_future) = swarm(DeniedTransport, |_, _| future::empty()); - assert!(swarm_ctrl.listen_on("/ip4/127.0.0.1/tcp/10000".parse().unwrap()).is_err()); - } - - #[test] - fn transport_error_propagation_dial() { - let (swarm_ctrl, _swarm_future) = swarm(DeniedTransport, |_, _| future::empty()); - let addr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); - assert!(swarm_ctrl.dial(addr, DeniedTransport).is_err()); - } - - #[test] - fn basic_dial() { - let (tx, rx) = transport::connector(); - - let reached_tx = Arc::new(atomic::AtomicBool::new(false)); - let reached_tx2 = reached_tx.clone(); - - let reached_rx = Arc::new(atomic::AtomicBool::new(false)); - let reached_rx2 = reached_rx.clone(); - - let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), |_, _| { - reached_rx2.store(true, atomic::Ordering::SeqCst); - future::empty() - }); - swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); - - let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), |_, _| { - reached_tx2.store(true, atomic::Ordering::SeqCst); - future::empty() - }); - - let dial_success = swarm_ctrl2.dial("/memory".parse().unwrap(), tx).unwrap(); - let future = swarm_future2.for_each(|_| Ok(())) - .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) - .select(dial_success).map(|_| ()).map_err(|(err, _)| err); - - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert!(reached_tx.load(atomic::Ordering::SeqCst)); - assert!(reached_rx.load(atomic::Ordering::SeqCst)); - } - - #[test] - fn dial_multiple_times() { - let (tx, rx) = transport::connector(); - let reached = Arc::new(atomic::AtomicUsize::new(0)); - let reached2 = reached.clone(); - let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| { - reached2.fetch_add(1, atomic::Ordering::SeqCst); - future::empty() - }); - swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); - let num_dials = 20000 + rand::random::() % 20000; - let mut dials = Vec::new(); - for _ in 0 .. num_dials { - let f = swarm_ctrl.dial("/memory".parse().unwrap(), tx.clone()).unwrap(); - dials.push(f); - } - let future = future::join_all(dials) - .map(|_| ()) - .select(swarm_future.for_each(|_| Ok(()))) - .map_err(|(err, _)| err); - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert_eq!(reached.load(atomic::Ordering::SeqCst), num_dials); - } - - #[test] - fn future_isnt_dropped() { - // Tests that the future in the closure isn't being dropped. - let (tx, rx) = transport::connector(); - let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| { - future::empty() - .then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() }) // <-- the test - }); - swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); - let dial_success = swarm_ctrl.dial("/memory".parse().unwrap(), tx).unwrap(); - let future = dial_success.select(swarm_future.for_each(|_| Ok(()))) - .map_err(|(err, _)| err); - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - } -} diff --git a/core/src/unique.rs b/core/src/unique.rs deleted file mode 100644 index 0e621587..00000000 --- a/core/src/unique.rs +++ /dev/null @@ -1,759 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use fnv::FnvHashMap; -use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture}; -use parking_lot::Mutex; -use {Multiaddr, MuxedTransport, SwarmController, Transport}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::mem; -use std::sync::{Arc, Weak, atomic::AtomicUsize, atomic::Ordering}; -use transport::interruptible::Interrupt; - -/// Storage for a unique connection with a remote. -pub struct UniqueConnec { - inner: Arc>>, -} - -enum UniqueConnecInner { - /// The `UniqueConnec` was created, but nothing is in it. - Empty, - /// We started dialing, but no response has been obtained so far. - Pending { - /// Tasks that need to be awakened when the content of this object is set. - tasks_waiting: FnvHashMap, - /// Future that represents when `tie_*` should have been called. - // TODO: Send + Sync bound is meh - dial_fut: Box + Send + Sync>, - /// Dropping this object will automatically interrupt the dial, which is very useful if - /// we clear or drop the `UniqueConnec`. - interrupt: Interrupt, - }, - /// The value of this unique connec has been set. - /// Can only transition to `Empty` when the future has expired. - Full { - /// Content of the object. - value: T, - /// Sender to trigger if the content gets cleared. - on_clear: oneshot::Sender<()>, - }, - /// The `dial_fut` has errored. - Errored(IoError), -} - -impl UniqueConnec { - /// Builds a new empty `UniqueConnec`. - #[inline] - pub fn empty() -> Self { - UniqueConnec { - inner: Arc::new(Mutex::new(UniqueConnecInner::Empty)), - } - } - - /// Builds a new `UniqueConnec` that contains a value. - #[inline] - pub fn with_value(value: T) -> Self { - let (on_clear, _) = oneshot::channel(); - UniqueConnec { - inner: Arc::new(Mutex::new(UniqueConnecInner::Full { value, on_clear })), - } - } - - /// Instantly returns the value from the object if there is any. - pub fn poll(&self) -> Option - where T: Clone, - { - let inner = self.inner.lock(); - if let UniqueConnecInner::Full { ref value, .. } = &*inner { - Some(value.clone()) - } else { - None - } - } - - /// Loads the value from the object. - /// - /// If the object is empty or has errored earlier, dials the given multiaddress with the - /// given transport. - /// - /// The closure of the `swarm` is expected to call `tie_*()` on the `UniqueConnec`. Failure - /// to do so will make the `UniqueConnecFuture` produce an error. - /// - /// One critical property of this method, is that if a connection incomes and `tie_*` is - /// called, then it will be returned by the returned future. - #[inline] - pub fn dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, - transport: Du) -> UniqueConnecFuture - where T: Clone + Send + 'static, // TODO: 'static :-/ - Du: Transport + 'static, // TODO: 'static :-/ - Du::Output: Into, - Du::Dial: Send, - Du::MultiaddrFuture: Send, - S: Clone + MuxedTransport, - S::Dial: Send, - S::Listener: Send, - S::ListenerUpgrade: Send, - S::Output: Send, - S::MultiaddrFuture: Send, - F: 'static, - { - self.dial_inner(swarm, multiaddr, transport, true) - } - - /// Same as `dial`, except that the future will produce an error if an earlier attempt to dial - /// has errored. - #[inline] - pub fn dial_if_empty(&self, swarm: &SwarmController, multiaddr: &Multiaddr, - transport: Du) -> UniqueConnecFuture - where T: Clone + Send + 'static, // TODO: 'static :-/ - Du: Transport + 'static, // TODO: 'static :-/ - Du::Output: Into, - Du::Dial: Send, - Du::MultiaddrFuture: Send, - S: Clone + MuxedTransport, - S::Dial: Send, - S::Listener: Send, - S::ListenerUpgrade: Send, - S::Output: Send, - S::MultiaddrFuture: Send, - F: 'static, - { - self.dial_inner(swarm, multiaddr, transport, false) - } - - /// Inner implementation of `dial_*`. - fn dial_inner(&self, swarm: &SwarmController, multiaddr: &Multiaddr, - transport: Du, dial_if_err: bool) -> UniqueConnecFuture - where T: Clone + Send + 'static, // TODO: 'static :-/ - Du: Transport + 'static, // TODO: 'static :-/ - Du::Output: Into, - Du::Dial: Send, - Du::MultiaddrFuture: Send, - S: Clone + MuxedTransport, - S::Dial: Send, - S::Listener: Send, - S::ListenerUpgrade: Send, - S::Output: Send, - S::MultiaddrFuture: Send, - F: 'static, - { - let mut inner = self.inner.lock(); - match &*inner { - UniqueConnecInner::Empty => (), - UniqueConnecInner::Errored(_) if dial_if_err => (), - _ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }, - }; - - let weak_inner = Arc::downgrade(&self.inner); - - let (transport, interrupt) = transport.interruptible(); - let dial_fut = swarm.dial_then(multiaddr.clone(), transport, - move |val: Result<(), IoError>| { - let inner = match weak_inner.upgrade() { - Some(i) => i, - None => return val - }; - - let mut inner = inner.lock(); - if let UniqueConnecInner::Full { .. } = *inner { - return val; - } - - let new_val = UniqueConnecInner::Errored(match val { - Ok(()) => IoError::new(IoErrorKind::ConnectionRefused, - "dialing has succeeded but tie_* hasn't been called"), - Err(ref err) => IoError::new(err.kind(), err.to_string()), - }); - - match mem::replace(&mut *inner, new_val) { - UniqueConnecInner::Pending { tasks_waiting, .. } => { - for task in tasks_waiting { - task.1.notify(); - } - }, - _ => () - }; - - val - }); - - let dial_fut = dial_fut - .map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported")) - .into_future() - .flatten(); - - *inner = UniqueConnecInner::Pending { - tasks_waiting: Default::default(), - dial_fut: Box::new(dial_fut), - interrupt, - }; - - UniqueConnecFuture { inner: Arc::downgrade(&self.inner) } - } - - /// Puts `value` inside the object. - /// Additionally, the `UniqueConnec` will be tied to the `until` future. When the future drops - /// or finishes, the `UniqueConnec` is automatically cleared. If the `UniqueConnec` is cleared - /// by the user, the future automatically stops. - /// The returned future is an adjusted version of that same future. - /// - /// If the object already contains something, then `until` is dropped and a dummy future that - /// immediately ends is returned. - pub fn tie_or_stop(&self, value: T, until: F) -> impl Future - where F: Future - { - self.tie_inner(value, until, false) - } - - /// Same as `tie_or_stop`, except that is if the object already contains something, then - /// `until` is returned immediately and can live in parallel. - pub fn tie_or_passthrough(&self, value: T, until: F) -> impl Future - where F: Future - { - self.tie_inner(value, until, true) - } - - /// Inner implementation of `tie_*`. - fn tie_inner(&self, value: T, until: F, pass_through: bool) -> impl Future - where F: Future - { - let mut tasks_to_notify = Default::default(); - - let mut inner = self.inner.lock(); - let (on_clear, on_clear_rx) = oneshot::channel(); - match mem::replace(&mut *inner, UniqueConnecInner::Full { value, on_clear }) { - UniqueConnecInner::Empty => {}, - UniqueConnecInner::Errored(_) => {}, - UniqueConnecInner::Pending { tasks_waiting, .. } => { - tasks_to_notify = tasks_waiting; - }, - old @ UniqueConnecInner::Full { .. } => { - // Keep the old value. - *inner = old; - if pass_through { - return future::Either::B(future::Either::A(until)); - } else { - return future::Either::B(future::Either::B(future::ok(()))); - } - }, - }; - drop(inner); - - struct Cleaner(Weak>>); - impl Drop for Cleaner { - #[inline] - fn drop(&mut self) { - if let Some(inner) = self.0.upgrade() { - *inner.lock() = UniqueConnecInner::Empty; - } - } - } - let cleaner = Cleaner(Arc::downgrade(&self.inner)); - - // The mutex is unlocked when we notify the pending tasks. - for task in tasks_to_notify { - task.1.notify(); - } - - let fut = until - .select(on_clear_rx.then(|_| Ok(()))) - .map(|((), _)| ()) - .map_err(|(err, _)| err) - .then(move |val| { - drop(cleaner); // Make sure that `cleaner` gets called there. - val - }); - future::Either::A(fut) - } - - /// Clears the content of the object. - /// - /// Has no effect if the content is empty or pending. - /// If the node was full, calling `clear` will stop the future returned by `tie_*`. - pub fn clear(&self) { - let mut inner = self.inner.lock(); - match mem::replace(&mut *inner, UniqueConnecInner::Empty) { - UniqueConnecInner::Empty => {}, - UniqueConnecInner::Errored(_) => {}, - pending @ UniqueConnecInner::Pending { .. } => { - *inner = pending; - }, - UniqueConnecInner::Full { on_clear, .. } => { - // TODO: Should we really replace the `Full` with an `Empty` here? What about - // letting dropping the future clear the connection automatically? Otherwise - // it is possible that the user dials before the future gets dropped, in which - // case the future dropping will set the value to `Empty`. But on the other hand, - // it is expected that `clear()` is instantaneous and if it is followed with - // `dial()` then it should dial. - let _ = on_clear.send(()); - }, - }; - } - - /// Returns the state of the object. - /// - /// Note that this can be racy, as the object can be used at the same time. In other words, - /// the returned value may no longer reflect the actual state. - pub fn state(&self) -> UniqueConnecState { - match *self.inner.lock() { - UniqueConnecInner::Empty => UniqueConnecState::Empty, - UniqueConnecInner::Errored(_) => UniqueConnecState::Errored, - UniqueConnecInner::Pending { .. } => UniqueConnecState::Pending, - UniqueConnecInner::Full { .. } => UniqueConnecState::Full, - } - } - - /// Returns true if the object has a pending or active connection. Returns false if the object - /// is empty or the connection has errored earlier. - #[inline] - pub fn is_alive(&self) -> bool { - match self.state() { - UniqueConnecState::Empty => false, - UniqueConnecState::Errored => false, - UniqueConnecState::Pending => true, - UniqueConnecState::Full => true, - } - } -} - -impl Clone for UniqueConnec { - #[inline] - fn clone(&self) -> UniqueConnec { - UniqueConnec { - inner: self.inner.clone(), - } - } -} - -impl Default for UniqueConnec { - #[inline] - fn default() -> Self { - UniqueConnec::empty() - } -} - -impl Drop for UniqueConnec { - fn drop(&mut self) { - // Notify the waiting futures if we are the last `UniqueConnec`. - if let Some(inner) = Arc::get_mut(&mut self.inner) { - match *inner.get_mut() { - UniqueConnecInner::Pending { ref mut tasks_waiting, .. } => { - for task in tasks_waiting.drain() { - task.1.notify(); - } - }, - _ => () - } - } - } -} - -/// Future returned by `UniqueConnec::dial()`. -#[must_use = "futures do nothing unless polled"] -pub struct UniqueConnecFuture { - inner: Weak>>, -} - -impl Future for UniqueConnecFuture - where T: Clone -{ - type Item = T; - type Error = IoError; - - fn poll(&mut self) -> Poll { - let inner = match self.inner.upgrade() { - Some(inner) => inner, - // All the `UniqueConnec` have been destroyed. - None => return Err(IoErrorKind::ConnectionAborted.into()), - }; - - let mut inner = inner.lock(); - match mem::replace(&mut *inner, UniqueConnecInner::Empty) { - UniqueConnecInner::Empty => { - // This can happen if `tie_*()` is called, and the future expires before the - // future returned by `dial()` gets polled. This means that the connection has been - // closed. - Err(IoErrorKind::ConnectionAborted.into()) - }, - UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut, interrupt } => { - match dial_fut.poll() { - Ok(Async::Ready(())) => { - // This happens if we successfully dialed a remote, but the callback - // doesn't call `tie_*`. This can be a logic error by the user, - // but could also indicate that the user decided to filter out this - // connection for whatever reason. - *inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into()); - Err(IoErrorKind::ConnectionAborted.into()) - }, - Ok(Async::NotReady) => { - static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); - task_local! { - static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) - } - tasks_waiting.insert(TASK_ID.with(|&k| k), task::current()); - *inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut, interrupt }; - Ok(Async::NotReady) - } - Err(err) => { - let tr = IoError::new(err.kind(), err.to_string()); - *inner = UniqueConnecInner::Errored(err); - Err(tr) - }, - } - }, - UniqueConnecInner::Full { value, on_clear } => { - *inner = UniqueConnecInner::Full { - value: value.clone(), - on_clear - }; - Ok(Async::Ready(value)) - }, - UniqueConnecInner::Errored(err) => { - let tr = IoError::new(err.kind(), err.to_string()); - *inner = UniqueConnecInner::Errored(err); - Err(tr) - }, - } - } -} - -/// State of a `UniqueConnec`. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum UniqueConnecState { - /// The object is empty. - Empty, - /// `dial` has been called and we are waiting for `tie_*` to be called. - Pending, - /// `tie_*` has been called. - Full, - /// The future returned by the closure of `dial` has errored or has finished before - /// `tie_*` has been called. - Errored, -} - -#[cfg(test)] -mod tests { - use futures::{future, sync::oneshot, Future, Stream}; - use transport::DeniedTransport; - use std::io::Error as IoError; - use std::sync::{Arc, atomic}; - use std::time::Duration; - use {UniqueConnec, UniqueConnecState}; - use {swarm, transport, Transport}; - use tokio::runtime::current_thread; - use tokio_timer; - - #[test] - fn basic_working() { - // Checks the basic working of the `UniqueConnec`. - let (tx, rx) = transport::connector(); - let unique_connec = UniqueConnec::empty(); - let unique_connec2 = unique_connec.clone(); - assert_eq!(unique_connec.state(), UniqueConnecState::Empty); - - let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| { - // Note that this handles both the dial and the listen. - assert!(unique_connec2.is_alive()); - unique_connec2.tie_or_stop(12, future::empty()) - }); - swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); - - let dial_success = unique_connec - .dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx) - .map(|val| { assert_eq!(val, 12); }); - assert_eq!(unique_connec.state(), UniqueConnecState::Pending); - - let future = dial_success.select(swarm_future.for_each(|_| Ok(()))).map_err(|(err, _)| err); - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert_eq!(unique_connec.state(), UniqueConnecState::Full); - } - - #[test] - fn invalid_multiaddr_produces_error() { - // Tests that passing an invalid multiaddress generates an error. - let unique = UniqueConnec::empty(); - assert_eq!(unique.state(), UniqueConnecState::Empty); - let unique2 = unique.clone(); - let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| { - unique2.tie_or_stop((), future::empty()) - }); - let fut = unique.dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(), DeniedTransport); - assert!(fut.wait().is_err()); - assert_eq!(unique.state(), UniqueConnecState::Errored); - } - - #[test] - fn tie_or_stop_stops() { - // Tests that `tie_or_stop` destroys additional futures passed to it. - let (tx, rx) = transport::connector(); - let unique_connec = UniqueConnec::empty(); - let unique_connec2 = unique_connec.clone(); - - // This channel is used to detect whether the future has been dropped. - let (msg_tx, msg_rx) = oneshot::channel(); - - let mut num_connec = 0; - let mut msg_rx = Some(msg_rx); - let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { - num_connec += 1; - if num_connec == 1 { - unique_connec2.tie_or_stop(12, future::Either::A(future::empty())) - } else { - let fut = msg_rx.take().unwrap().map_err(|_| panic!()); - unique_connec2.tie_or_stop(13, future::Either::B(fut)) - } - }); - swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); - - let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { - future::empty() - }); - - let dial_success = unique_connec - .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx.clone()) - .map(|val| { assert_eq!(val, 12); }) - .inspect({ - let c = unique_connec.clone(); - move |_| { assert!(c.is_alive()); } - }) - .and_then(|_| { - tokio_timer::sleep(Duration::from_secs(1)) - .map_err(|_| unreachable!()) - }) - .and_then(move |_| { - swarm_ctrl2.dial("/memory".parse().unwrap(), tx) - .unwrap_or_else(|_| panic!()) - }) - .inspect({ - let c = unique_connec.clone(); - move |_| { - assert_eq!(c.poll(), Some(12)); // Not 13 - assert!(msg_tx.send(()).is_err()); - } - }); - - let future = dial_success - .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); - - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert!(unique_connec.is_alive()); - } - - #[test] - fn tie_or_passthrough_passes_through() { - // Tests that `tie_or_passthrough` doesn't delete additional futures passed to it when - // it is already full, and doesn't gets its value modified when that happens. - let (tx, rx) = transport::connector(); - let unique_connec = UniqueConnec::empty(); - let unique_connec2 = unique_connec.clone(); - - let mut num = 12; - let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| { - // Note that this handles both the dial and the listen. - let fut = future::empty().then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() }); - num += 1; - unique_connec2.tie_or_passthrough(num, fut) - }); - swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); - - let dial_success = unique_connec - .dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx.clone()) - .map(|val| { assert_eq!(val, 13); }); - - swarm_ctrl.dial("/memory".parse().unwrap(), tx) - .unwrap(); - - let future = dial_success.select(swarm_future.for_each(|_| Ok(()))).map_err(|(err, _)| err); - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert_eq!(unique_connec.poll(), Some(13)); - } - - #[test] - fn cleared_when_future_drops() { - // Tests that the `UniqueConnec` gets cleared when the future we associate with it gets - // destroyed. - let (tx, rx) = transport::connector(); - let unique_connec = UniqueConnec::empty(); - let unique_connec2 = unique_connec.clone(); - - let (msg_tx, msg_rx) = oneshot::channel(); - let mut msg_rx = Some(msg_rx); - - let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { - future::empty() - }); - swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); - - let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { - let fut = msg_rx.take().unwrap().map_err(|_| -> IoError { unreachable!() }); - unique_connec2.tie_or_stop(12, fut) - }); - - let dial_success = unique_connec - .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx) - .map(|val| { assert_eq!(val, 12); }) - .inspect({ - let c = unique_connec.clone(); - move |_| { assert!(c.is_alive()); } - }) - .and_then(|_| { - msg_tx.send(()).unwrap(); - tokio_timer::sleep(Duration::from_secs(1)) - .map_err(|_| unreachable!()) - }) - .inspect({ - let c = unique_connec.clone(); - move |_| { assert!(!c.is_alive()); } - }); - - let future = dial_success - .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); - - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert!(!unique_connec.is_alive()); - } - - #[test] - fn future_drops_when_cleared() { - // Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get cleared. - let (tx, rx) = transport::connector(); - let unique_connec = UniqueConnec::empty(); - let unique_connec2 = unique_connec.clone(); - - let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { - future::empty() - }); - swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); - - let finished = Arc::new(atomic::AtomicBool::new(false)); - let finished2 = finished.clone(); - let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { - let finished2 = finished2.clone(); - unique_connec2.tie_or_stop(12, future::empty()).then(move |v| { - finished2.store(true, atomic::Ordering::Relaxed); - v - }) - }); - - let dial_success = unique_connec - .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx) - .map(|val| { assert_eq!(val, 12); }) - .inspect({ - let c = unique_connec.clone(); - move |_| { - assert!(c.is_alive()); - c.clear(); - assert!(!c.is_alive()); - } - }) - .and_then(|_| { - tokio_timer::sleep(Duration::from_secs(1)) - .map_err(|_| unreachable!()) - }) - .inspect({ - let c = unique_connec.clone(); - move |_| { - assert!(finished.load(atomic::Ordering::Relaxed)); - assert!(!c.is_alive()); - } - }); - - let future = dial_success - .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); - - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - assert!(!unique_connec.is_alive()); - } - - #[test] - fn future_drops_when_destroyed() { - // Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get dropped. - let (tx, rx) = transport::connector(); - let unique_connec = UniqueConnec::empty(); - let mut unique_connec2 = Some(unique_connec.clone()); - - let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { - future::empty() - }); - swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); - - let finished = Arc::new(atomic::AtomicBool::new(false)); - let finished2 = finished.clone(); - let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { - let finished2 = finished2.clone(); - unique_connec2.take().unwrap().tie_or_stop(12, future::empty()).then(move |v| { - finished2.store(true, atomic::Ordering::Relaxed); - v - }) - }); - - let dial_success = unique_connec - .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx) - .map(|val| { assert_eq!(val, 12); }) - .inspect(move |_| { - assert!(unique_connec.is_alive()); - drop(unique_connec); - }) - .and_then(|_| { - tokio_timer::sleep(Duration::from_secs(1)) - .map_err(|_| unreachable!()) - }) - .inspect(move |_| { - assert!(finished.load(atomic::Ordering::Relaxed)); - }); - - let future = dial_success - .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); - - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - } - - #[test] - fn error_if_unique_connec_destroyed_before_future() { - // Tests that the future returned by `dial` returns an error if the `UniqueConnec` no - // longer exists. - let (tx, rx) = transport::connector(); - - let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| { - future::empty() - }); - swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); - - let unique_connec = UniqueConnec::empty(); - let dial_success = unique_connec - .dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx) - .then(|val: Result<(), IoError>| { - assert!(val.is_err()); - Ok(()) - }); - drop(unique_connec); - - let future = dial_success - .select(swarm_future.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); - current_thread::Runtime::new().unwrap().block_on(future).unwrap(); - } - - // TODO: test that dialing is interrupted when UniqueConnec is cleared - // TODO: test that dialing is interrupted when UniqueConnec is dropped -}