diff --git a/core/Cargo.toml b/core/Cargo.toml index 8a476a52..c9391ec1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -25,5 +25,7 @@ libp2p-ping = { path = "../ping" } libp2p-tcp-transport = { path = "../tcp-transport" } libp2p-mplex = { path = "../mplex" } rand = "0.5" +tokio = "0.1" tokio-codec = "0.1" tokio-current-thread = "0.1" +tokio-timer = "0.2" diff --git a/core/src/lib.rs b/core/src/lib.rs index a164dc67..46fc2812 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -227,9 +227,13 @@ extern crate tokio_io; #[cfg(test)] extern crate rand; #[cfg(test)] +extern crate tokio; +#[cfg(test)] extern crate tokio_codec; #[cfg(test)] extern crate tokio_current_thread; +#[cfg(test)] +extern crate tokio_timer; /// Multi-address re-export. pub extern crate multiaddr; diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 7347506d..bd6b703a 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -18,11 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::stream::{FuturesUnordered, StreamFuture}; -use futures::sync::{mpsc, oneshot}; -use futures::{future, Async, Future, IntoFuture, Poll, Stream}; +use futures::stream::StreamFuture; +use futures::sync::oneshot; +use futures::{Async, Future, IntoFuture, Poll, Stream}; +use futures::task; +use parking_lot::Mutex; use std::fmt; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::sync::Arc; use {Multiaddr, MuxedTransport, Transport}; /// Creates a swarm. @@ -32,38 +35,33 @@ use {Multiaddr, MuxedTransport, Transport}; /// /// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to /// control, and the `Future` must be driven to completion in order for things to work. -/// pub fn swarm( transport: T, handler: H, -) -> (SwarmController, SwarmFuture) +) -> (SwarmController, SwarmFuture) where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ H: FnMut(T::Output, Box>) -> F, F: IntoFuture, { - let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded(); - let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded(); - let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded(); + let shared = Arc::new(Mutex::new(Shared { + next_incoming: transport.clone().next_incoming(), + listeners: Vec::new(), + listeners_upgrade: Vec::new(), + dialers: Vec::new(), + to_process: Vec::new(), + task_to_notify: None, + })); let future = SwarmFuture { transport: transport.clone(), + shared: shared.clone(), handler: handler, - new_listeners: new_listeners_rx, - next_incoming: transport.clone().next_incoming(), - listeners: FuturesUnordered::new(), - listeners_upgrade: FuturesUnordered::new(), - dialers: FuturesUnordered::new(), - new_dialers: new_dialers_rx, - to_process: FuturesUnordered::new(), - new_toprocess: new_toprocess_rx, }; let controller = SwarmController { - transport: transport, - new_listeners: new_listeners_tx, - new_dialers: new_dialers_tx, - new_toprocess: new_toprocess_tx, + transport, + shared, }; (controller, future) @@ -74,10 +72,11 @@ pub struct SwarmController where T: MuxedTransport + 'static, // TODO: 'static :-/ { + /// Shared between the swarm infrastructure. + shared: Arc>>, + + /// Transport used to dial or listen. transport: T, - new_listeners: mpsc::UnboundedSender, - new_dialers: mpsc::UnboundedSender>, Box>), Error = ()>>>, - new_toprocess: mpsc::UnboundedSender>>, } impl fmt::Debug for SwarmController @@ -98,9 +97,7 @@ where fn clone(&self) -> SwarmController { SwarmController { transport: self.transport.clone(), - new_listeners: self.new_listeners.clone(), - new_dialers: self.new_dialers.clone(), - new_toprocess: self.new_toprocess.clone(), + shared: self.shared.clone(), } } } @@ -116,33 +113,62 @@ where /// Returns a future that is signalled once the closure in the `swarm` has returned its future. /// Therefore if the closure in the swarm has some side effect (eg. write something in a /// variable), this side effect will be observable when this future succeeds. + #[inline] pub fn dial(&self, multiaddr: Multiaddr, transport: Du) -> Result, Multiaddr> where Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, + { + self.dial_then(multiaddr, transport, |v| v) + } + + /// Internal version of `dial` that allows adding a closure that is called after either the + /// dialing fails or the handler has been called with the resulting future. + /// + /// The returned future is filled with the output of `then`. + pub(crate) fn dial_then(&self, multiaddr: Multiaddr, transport: Du, then: F) + -> Result, Multiaddr> + where + Du: Transport + 'static, // TODO: 'static :-/ + Du::Output: Into, + F: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static, { trace!("Swarm dialing {}", multiaddr); match transport.dial(multiaddr.clone()) { Ok(dial) => { let (tx, rx) = oneshot::channel(); + let mut then = Some(move |val| { + let _ = tx.send(then(val)); + }); + // Unfortunately the `Box` type is still unusable in Rust right now, + // so we use a `Box` instead and panic if it is called multiple times. + let mut then = Box::new(move |val: Result<(), IoError>| { + let then = then.take().expect("The Boxed FnMut should only be called once"); + then(val); + }) as Box; + let dial = dial.then(|result| { match result { Ok((output, client_addr)) => { let client_addr = Box::new(client_addr) as Box>; - Ok((output.into(), tx, client_addr)) + Ok((output.into(), then, client_addr)) } Err(err) => { debug!("Error in dialer upgrade: {:?}", err); - let _ = tx.send(Err(err)); + then(Err(err)); Err(()) } } }); - // Ignoring errors if the receiver has been closed, because in that situation - // nothing is going to be processed anyway. - let _ = self.new_dialers.unbounded_send(Box::new(dial) as Box<_>); + + let mut shared = self.shared.lock(); + shared.dialers.push((multiaddr, Box::new(dial) as Box<_>)); + if let Some(task) = shared.task_to_notify.take() { + task.notify(); + } + Ok(rx.then(|result| { match result { Ok(Ok(())) => Ok(()), @@ -156,15 +182,38 @@ where } } + /// Interrupts all dialing attempts to a specific multiaddress. + /// + /// Has no effect if the dialing attempt has already succeeded, in which case it will be + /// dispatched to the handler. + pub fn interrupt_dial(&self, multiaddr: &Multiaddr) { + let mut shared = self.shared.lock(); + shared.dialers.retain(|dialer| { + &dialer.0 != multiaddr + }); + } + /// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that /// was passed to `swarm`. + // TODO: add a way to cancel a listener pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { match self.transport.clone().listen_on(multiaddr) { Ok((listener, new_addr)) => { trace!("Swarm listening on {}", new_addr); - // Ignoring errors if the receiver has been closed, because in that situation - // nothing is going to be processed anyway. - let _ = self.new_listeners.unbounded_send(listener); + let mut shared = self.shared.lock(); + let listener = Box::new( + listener.map(|f| { + let f = f.map(|(out, maf)| { + (out, Box::new(maf) as Box>) + }); + + Box::new(f) as Box> + }), + ) as Box>; + shared.listeners.push(listener.into_future()); + if let Some(task) = shared.task_to_notify.take() { + task.notify(); + } Ok(new_addr) } Err((_, multiaddr)) => Err(multiaddr), @@ -173,15 +222,152 @@ where } /// Future that must be driven to completion in order for the swarm to work. -pub struct SwarmFuture +pub struct SwarmFuture where T: MuxedTransport + 'static, // TODO: 'static :-/ { + /// Shared between the swarm infrastructure. + shared: Arc>>, + + /// The transport used to dial. transport: T, + + /// Swarm handler. handler: H, - new_listeners: mpsc::UnboundedReceiver, +} + +impl Future for SwarmFuture +where + T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, + H: FnMut(T::Output, Box>) -> If, + If: IntoFuture, + F: Future + 'static, // TODO: 'static :-/ +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + let mut shared = self.shared.lock(); + let handler = &mut self.handler; + + loop { + match shared.next_incoming.poll() { + Ok(Async::Ready(connec)) => { + debug!("Swarm received new multiplexed incoming connection"); + shared.next_incoming = self.transport.clone().next_incoming(); + let connec = connec.map(|(out, maf)| { + (out, Box::new(maf) as Box>) + }); + shared.listeners_upgrade.push(Box::new(connec) as Box<_>); + } + Ok(Async::NotReady) => break, + Err(err) => { + // TODO: should that stop everything? + debug!("Error in multiplexed incoming connection: {:?}", err); + shared.next_incoming = self.transport.clone().next_incoming(); + break; + } + } + } + + // We remove each element from `shared.listeners` one by one and add them back only + // if relevant. + for n in (0 .. shared.listeners.len()).rev() { + let mut listener = shared.listeners.swap_remove(n); + loop { + match listener.poll() { + Ok(Async::Ready((Some(upgrade), remaining))) => { + trace!("Swarm received new connection on listener socket"); + shared.listeners_upgrade.push(upgrade); + listener = remaining.into_future(); + } + Ok(Async::Ready((None, _))) => { + debug!("Listener closed gracefully"); + break; + }, + Err((err, _)) => { + debug!("Error in listener: {:?}", err); + break; + } + Ok(Async::NotReady) => { + shared.listeners.push(listener); + break; + } + } + } + } + + // We remove each element from `shared.listeners_upgrade` one by one and add them back + // only if relevant. + for n in (0 .. shared.listeners_upgrade.len()).rev() { + let mut listener_upgrade = shared.listeners_upgrade.swap_remove(n); + match listener_upgrade.poll() { + Ok(Async::Ready((output, client_addr))) => { + debug!("Successfully upgraded incoming connection"); + // TODO: unlock mutex before calling handler, in order to avoid deadlocks if + // the user does something stupid + shared.to_process.push(Box::new(handler(output, client_addr).into_future())); + } + Err(err) => { + debug!("Error in listener upgrade: {:?}", err); + } + Ok(Async::NotReady) => { + shared.listeners_upgrade.push(listener_upgrade); + }, + } + } + + // We remove each element from `shared.dialers` one by one and add them back only + // if relevant. + for n in (0 .. shared.dialers.len()).rev() { + let (client_addr, mut dialer) = shared.dialers.swap_remove(n); + match dialer.poll() { + Ok(Async::Ready((output, mut notifier, addr))) => { + trace!("Successfully upgraded dialed connection"); + // TODO: unlock mutex before calling handler, in order to avoid deadlocks if + // the user does something stupid + shared.to_process.push(Box::new(handler(output, addr).into_future())); + notifier(Ok(())); + } + Err(()) => {}, + Ok(Async::NotReady) => { + shared.dialers.push((client_addr, dialer)); + }, + } + } + + // We remove each element from `shared.to_process` one by one and add them back only + // if relevant. + for n in (0 .. shared.to_process.len()).rev() { + let mut to_process = shared.to_process.swap_remove(n); + match to_process.poll() { + Ok(Async::Ready(())) => { + trace!("Future returned by swarm handler driven to completion"); + } + Err(err) => { + debug!("Error in processing: {:?}", err); + } + Ok(Async::NotReady) => { + shared.to_process.push(to_process); + } + } + } + + // TODO: we never return `Ok(Ready)` because there's no way to know whether + // `next_incoming()` can produce anything more in the future ; also we would need to + // know when the controller has been dropped + shared.task_to_notify = Some(task::current()); + Ok(Async::NotReady) + } +} + +// TODO: stronger typing +struct Shared where T: MuxedTransport + 'static { + /// Next incoming substream on the transport. next_incoming: T::Incoming, - listeners: FuturesUnordered< + + /// All the active listeners. + listeners: Vec< StreamFuture< Box< Stream< @@ -191,162 +377,33 @@ where >, >, >, + + /// Futures that upgrade an incoming listening connection to a full connection. listeners_upgrade: - FuturesUnordered>), Error = IoError>>>, - dialers: FuturesUnordered>, Box>), Error = ()>>>, - new_dialers: - mpsc::UnboundedReceiver>, Box>), Error = ()>>>, - to_process: FuturesUnordered>>>, - new_toprocess: mpsc::UnboundedReceiver>>, -} + Vec>), Error = IoError>>>, -impl Future for SwarmFuture -where - T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, - H: FnMut(T::Output, Box>) -> If, - If: IntoFuture, - F: Future, -{ - type Item = (); - type Error = IoError; + /// Futures that dial a remote address. + /// + /// Contains the address we dial, so that we can cancel it if necessary. + dialers: Vec<(Multiaddr, Box)>, Box>), Error = ()>>)>, - fn poll(&mut self) -> Poll { - let handler = &mut self.handler; + /// List of futures produced by the swarm closure. Must be processed to the end. + to_process: Vec>>, - loop { - match self.next_incoming.poll() { - Ok(Async::Ready(connec)) => { - debug!("Swarm received new multiplexed incoming connection"); - self.next_incoming = self.transport.clone().next_incoming(); - let connec = connec.map(|(out, maf)| { - (out, Box::new(maf) as Box>) - }); - self.listeners_upgrade.push(Box::new(connec) as Box<_>); - } - Ok(Async::NotReady) => break, - Err(err) => { - debug!("Error in multiplexed incoming connection: {:?}", err); - self.next_incoming = self.transport.clone().next_incoming(); - break; - } - } - } - - loop { - match self.new_listeners.poll() { - Ok(Async::Ready(Some(new_listener))) => { - let new_listener = Box::new( - new_listener.map(|f| { - let f = f.map(|(out, maf)| { - (out, Box::new(maf) as Box>) - }); - - Box::new(f) as Box> - }), - ) as Box>; - self.listeners.push(new_listener.into_future()); - } - Ok(Async::Ready(None)) | Err(_) => { - // New listener sender has been closed. - break; - } - Ok(Async::NotReady) => break, - } - } - - loop { - match self.new_dialers.poll() { - Ok(Async::Ready(Some(new_dialer))) => { - self.dialers.push(new_dialer); - } - Ok(Async::Ready(None)) | Err(_) => { - // New dialers sender has been closed. - break - } - Ok(Async::NotReady) => break, - } - } - - loop { - match self.new_toprocess.poll() { - Ok(Async::Ready(Some(new_toprocess))) => { - self.to_process.push(future::Either::B(new_toprocess)); - } - Ok(Async::Ready(None)) | Err(_) => { - // New to-process sender has been closed. - break - } - Ok(Async::NotReady) => break, - } - } - - loop { - match self.listeners.poll() { - Ok(Async::Ready(Some((Some(upgrade), remaining)))) => { - trace!("Swarm received new connection on listener socket"); - self.listeners_upgrade.push(upgrade); - self.listeners.push(remaining.into_future()); - } - Err((err, _)) => { - debug!("Error in listener: {:?}", err); - break - } - _ => break - } - } - - loop { - match self.listeners_upgrade.poll() { - Ok(Async::Ready(Some((output, client_addr)))) => { - debug!("Successfully upgraded incoming connection"); - self.to_process.push(future::Either::A( - handler(output, client_addr).into_future(), - )); - } - Err(err) => { - debug!("Error in listener upgrade: {:?}", err); - break; - } - _ => break - } - } - - loop { - match self.dialers.poll() { - Ok(Async::Ready(Some((output, notifier, addr)))) => { - trace!("Successfully upgraded dialed connection"); - self.to_process - .push(future::Either::A(handler(output, addr).into_future())); - let _ = notifier.send(Ok(())); - } - Err(()) => break, - _ => break, - } - } - - loop { - match self.to_process.poll() { - Ok(Async::Ready(Some(()))) => { - trace!("Future returned by swarm handler driven to completion"); - } - Err(err) => { - debug!("Error in processing: {:?}", err); - } - _ => break, - } - } - - // TODO: we never return `Ok(Ready)` because there's no way to know whether - // `next_incoming()` can produce anything more in the future - Ok(Async::NotReady) - } + /// The task to notify whenever we add a new element in one of the lists. + /// Necessary so that the task wakes up and the element gets polled. + task_to_notify: Option, } #[cfg(test)] mod tests { - use futures::future; - use transport::DeniedTransport; + use futures::{Future, future}; + use rand; + use transport::{self, DeniedTransport, Transport}; + use std::io::Error as IoError; + use std::sync::{atomic, Arc}; use swarm; + use tokio::runtime::current_thread; #[test] fn transport_error_propagation_listen() { @@ -360,4 +417,74 @@ mod tests { let addr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); assert!(swarm_ctrl.dial(addr, DeniedTransport).is_err()); } + + #[test] + fn basic_dial() { + let (tx, rx) = transport::connector(); + + let reached_tx = Arc::new(atomic::AtomicBool::new(false)); + let reached_tx2 = reached_tx.clone(); + + let reached_rx = Arc::new(atomic::AtomicBool::new(false)); + let reached_rx2 = reached_rx.clone(); + + let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), |_, _| { + reached_rx2.store(true, atomic::Ordering::SeqCst); + future::empty() + }); + swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); + + let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), |_, _| { + reached_tx2.store(true, atomic::Ordering::SeqCst); + future::empty() + }); + + let dial_success = swarm_ctrl2.dial("/memory".parse().unwrap(), tx).unwrap(); + let future = swarm_future2 + .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) + .select(dial_success).map(|_| ()).map_err(|(err, _)| err); + + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert!(reached_tx.load(atomic::Ordering::SeqCst)); + assert!(reached_rx.load(atomic::Ordering::SeqCst)); + } + + #[test] + fn dial_multiple_times() { + let (tx, rx) = transport::connector(); + let reached = Arc::new(atomic::AtomicUsize::new(0)); + let reached2 = reached.clone(); + let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| { + reached2.fetch_add(1, atomic::Ordering::SeqCst); + future::empty() + }); + swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); + let num_dials = 20000 + rand::random::() % 20000; + let mut dials = Vec::new(); + for _ in 0 .. num_dials { + let f = swarm_ctrl.dial("/memory".parse().unwrap(), tx.clone()).unwrap(); + dials.push(f); + } + let future = future::join_all(dials) + .map(|_| ()) + .select(swarm_future) + .map_err(|(err, _)| err); + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert_eq!(reached.load(atomic::Ordering::SeqCst), num_dials); + } + + #[test] + fn future_isnt_dropped() { + // Tests that the future in the closure isn't being dropped. + let (tx, rx) = transport::connector(); + let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| { + future::empty() + .then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() }) // <-- the test + }); + swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); + let dial_success = swarm_ctrl.dial("/memory".parse().unwrap(), tx).unwrap(); + let future = dial_success.select(swarm_future) + .map_err(|(err, _)| err); + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + } } diff --git a/core/src/transport/interruptible.rs b/core/src/transport/interruptible.rs new file mode 100644 index 00000000..f18282b4 --- /dev/null +++ b/core/src/transport/interruptible.rs @@ -0,0 +1,121 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{future, prelude::*, sync::oneshot}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use transport::{MuxedTransport, Transport}; +use Multiaddr; + +/// See `Transport::interruptible`. +#[derive(Debug, Clone)] +pub struct Interruptible { + transport: T, + rx: future::Shared>, +} + +impl Interruptible { + /// Internal function that builds an `Interruptible`. + #[inline] + pub(crate) fn new(transport: T) -> (Interruptible, Interrupt) { + let (_tx, rx) = oneshot::channel(); + let transport = Interruptible { transport, rx: rx.shared() }; + let int = Interrupt { _tx }; + (transport, int) + } +} + +impl Transport for Interruptible +where + T: Transport, +{ + type Output = T::Output; + type MultiaddrFuture = T::MultiaddrFuture; + type Listener = T::Listener; + type ListenerUpgrade = T::ListenerUpgrade; + type Dial = InterruptibleDial; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + match self.transport.listen_on(addr) { + Ok(val) => Ok(val), + Err((transport, addr)) => Err((Interruptible { transport, rx: self.rx }, addr)), + } + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + match self.transport.dial(addr) { + Ok(future) => { + Ok(InterruptibleDial { + inner: future, + rx: self.rx, + }) + } + Err((transport, addr)) => Err((Interruptible { transport, rx: self.rx }, addr)), + } + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.nat_traversal(server, observed) + } +} + +impl MuxedTransport for Interruptible +where + T: MuxedTransport, +{ + type Incoming = T::Incoming; + type IncomingUpgrade = T::IncomingUpgrade; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + self.transport.next_incoming() + } +} + +/// Dropping this object interrupts the dialing of the corresponding `Interruptible`. +pub struct Interrupt { + _tx: oneshot::Sender<()>, +} + +pub struct InterruptibleDial { + inner: F, + rx: future::Shared>, +} + +impl Future for InterruptibleDial + where F: Future +{ + type Item = F::Item; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(_)) | Err(_) => { + return Err(IoError::new(IoErrorKind::ConnectionAborted, "connection interrupted")); + }, + Ok(Async::NotReady) => (), + }; + + self.inner.poll() + } +} diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index c0e2048f..3f7f532f 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -39,6 +39,7 @@ pub mod and_then; pub mod choice; pub mod denied; pub mod dummy; +pub mod interruptible; pub mod map; pub mod map_err; pub mod memory; @@ -192,4 +193,13 @@ pub trait Transport { { DummyMuxing::new(self) } + + /// Wraps around the `Transport` and makes it interruptible. + #[inline] + fn interruptible(self) -> (interruptible::Interruptible, interruptible::Interrupt) + where + Self: Sized, + { + interruptible::Interruptible::new(self) + } } diff --git a/core/src/unique.rs b/core/src/unique.rs index 99d0dcf6..bbaf134c 100644 --- a/core/src/unique.rs +++ b/core/src/unique.rs @@ -18,12 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use fnv::FnvHashMap; use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture}; use parking_lot::Mutex; use {Multiaddr, MuxedTransport, SwarmController, Transport}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::mem; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Weak, atomic::AtomicUsize, atomic::Ordering}; +use transport::interruptible::Interrupt; /// Storage for a unique connection with a remote. pub struct UniqueConnec { @@ -36,10 +38,13 @@ enum UniqueConnecInner { /// We started dialing, but no response has been obtained so far. Pending { /// Tasks that need to be awakened when the content of this object is set. - tasks_waiting: Vec, - /// Future that represents when `set_until` should have been called. + tasks_waiting: FnvHashMap, + /// Future that represents when `tie_*` should have been called. // TODO: Send + Sync bound is meh dial_fut: Box + Send + Sync>, + /// Dropping this object will automatically interrupt the dial, which is very useful if + /// we clear or drop the `UniqueConnec`. + interrupt: Interrupt, }, /// The value of this unique connec has been set. /// Can only transition to `Empty` when the future has expired. @@ -85,69 +90,127 @@ impl UniqueConnec { /// Loads the value from the object. /// - /// If the object is empty, dials the given multiaddress with the given transport. + /// If the object is empty or has errored earlier, dials the given multiaddress with the + /// given transport. /// - /// The closure of the `swarm` is expected to call `set_until()` on the `UniqueConnec`. Failure + /// The closure of the `swarm` is expected to call `tie_*()` on the `UniqueConnec`. Failure /// to do so will make the `UniqueConnecFuture` produce an error. - pub fn get_or_dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + /// + /// One critical property of this method, is that if a connection incomes and `tie_*` is + /// called, then it will be returned by the returned future. + #[inline] + pub fn dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, transport: Du) -> UniqueConnecFuture - where T: Clone, + where T: Clone + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, S: Clone + MuxedTransport, { - self.get(|| { - swarm.dial(multiaddr.clone(), transport) - .map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported")) - .into_future() - .flatten() - }) + self.dial_inner(swarm, multiaddr, transport, true) } - /// Loads the value from the object. - /// - /// If the object is empty, calls the closure. The closure should return a future that - /// should be signaled after `set_until` has been called. If the future produces an error, - /// then the object will empty itself again and the `UniqueConnecFuture` will return an error. - /// If the future is finished and `set_until` hasn't been called, then the `UniqueConnecFuture` - /// will return an error. - pub fn get(&self, or: F) -> UniqueConnecFuture - where F: FnOnce() -> Fut, - T: Clone, - Fut: IntoFuture, - Fut::Future: Send + Sync + 'static, // TODO: 'static :-/ + /// Same as `dial`, except that the future will produce an error if an earlier attempt to dial + /// has errored. + #[inline] + pub fn dial_if_empty(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + transport: Du) -> UniqueConnecFuture + where T: Clone + 'static, // TODO: 'static :-/ + Du: Transport + 'static, // TODO: 'static :-/ + Du::Output: Into, + S: Clone + MuxedTransport, { - match &*self.inner.lock() { + self.dial_inner(swarm, multiaddr, transport, false) + } + + /// Inner implementation of `dial_*`. + fn dial_inner(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + transport: Du, dial_if_err: bool) -> UniqueConnecFuture + where T: Clone + 'static, // TODO: 'static :-/ + Du: Transport + 'static, // TODO: 'static :-/ + Du::Output: Into, + S: Clone + MuxedTransport, + { + let mut inner = self.inner.lock(); + match &*inner { UniqueConnecInner::Empty => (), + UniqueConnecInner::Errored(_) if dial_if_err => (), _ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }, }; - // The mutex is unlocked when we call `or`, in order to avoid potential deadlocks. - let dial_fut = or().into_future(); + let weak_inner = Arc::downgrade(&self.inner); - let mut inner = self.inner.lock(); - // Since we unlocked the mutex, it's possible that the object was filled in the meanwhile. - // Therefore we check again whether it's still `Empty`. - if let UniqueConnecInner::Empty = &mut *inner { - *inner = UniqueConnecInner::Pending { - tasks_waiting: Vec::new(), - dial_fut: Box::new(dial_fut), - }; - } + let (transport, interrupt) = transport.interruptible(); + let dial_fut = swarm.dial_then(multiaddr.clone(), transport, + move |val: Result<(), IoError>| { + let inner = match weak_inner.upgrade() { + Some(i) => i, + None => return val + }; + + let mut inner = inner.lock(); + if let UniqueConnecInner::Full { .. } = *inner { + return val; + } + + let new_val = UniqueConnecInner::Errored(match val { + Ok(()) => IoError::new(IoErrorKind::ConnectionRefused, + "dialing has succeeded but tie_* hasn't been called"), + Err(ref err) => IoError::new(err.kind(), err.to_string()), + }); + + match mem::replace(&mut *inner, new_val) { + UniqueConnecInner::Pending { tasks_waiting, .. } => { + for task in tasks_waiting { + task.1.notify(); + } + }, + _ => () + }; + + val + }); + + let dial_fut = dial_fut + .map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported")) + .into_future() + .flatten(); + + *inner = UniqueConnecInner::Pending { + tasks_waiting: Default::default(), + dial_fut: Box::new(dial_fut), + interrupt, + }; UniqueConnecFuture { inner: Arc::downgrade(&self.inner) } } - /// Puts `value` inside the object. The second parameter is a future whose completion will - /// clear up the content. Returns an adjusted version of that same future. + /// Puts `value` inside the object. + /// Additionally, the `UniqueConnec` will be tied to the `until` future. When the future drops + /// or finishes, the `UniqueConnec` is automatically cleared. If the `UniqueConnec` is cleared + /// by the user, the future automatically stops. + /// The returned future is an adjusted version of that same future. /// - /// If `clear()` is called, the returned future will automatically complete with an error. - /// - /// Has no effect if the object already contains something. - pub fn set_until(&self, value: T, until: F) -> impl Future + /// If the object already contains something, then `until` is dropped and a dummy future that + /// immediately ends is returned. + pub fn tie_or_stop(&self, value: T, until: F) -> impl Future where F: Future { - let mut tasks_to_notify = Vec::new(); + self.tie_inner(value, until, false) + } + + /// Same as `tie_or_stop`, except that is if the object already contains something, then + /// `until` is returned immediately and can live in parallel. + pub fn tie_or_passthrough(&self, value: T, until: F) -> impl Future + where F: Future + { + self.tie_inner(value, until, true) + } + + /// Inner implementation of `tie_*`. + fn tie_inner(&self, value: T, until: F, pass_through: bool) -> impl Future + where F: Future + { + let mut tasks_to_notify = Default::default(); let mut inner = self.inner.lock(); let (on_clear, on_clear_rx) = oneshot::channel(); @@ -160,23 +223,37 @@ impl UniqueConnec { old @ UniqueConnecInner::Full { .. } => { // Keep the old value. *inner = old; - return future::Either::B(until); + if pass_through { + return future::Either::B(future::Either::A(until)); + } else { + return future::Either::B(future::Either::B(future::ok(()))); + } }, }; drop(inner); + struct Cleaner(Weak>>); + impl Drop for Cleaner { + #[inline] + fn drop(&mut self) { + if let Some(inner) = self.0.upgrade() { + *inner.lock() = UniqueConnecInner::Empty; + } + } + } + let cleaner = Cleaner(Arc::downgrade(&self.inner)); + // The mutex is unlocked when we notify the pending tasks. for task in tasks_to_notify { - task.notify(); + task.1.notify(); } - let inner = self.inner.clone(); let fut = until .select(on_clear_rx.then(|_| Ok(()))) .map(|((), _)| ()) .map_err(|(err, _)| err) .then(move |val| { - *inner.lock() = UniqueConnecInner::Empty; + drop(cleaner); // Make sure that `cleaner` gets called there. val }); future::Either::A(fut) @@ -185,7 +262,7 @@ impl UniqueConnec { /// Clears the content of the object. /// /// Has no effect if the content is empty or pending. - /// If the node was full, calling `clear` will stop the future returned by `set_until`. + /// If the node was full, calling `clear` will stop the future returned by `tie_*`. pub fn clear(&self) { let mut inner = self.inner.lock(); match mem::replace(&mut *inner, UniqueConnecInner::Empty) { @@ -195,6 +272,12 @@ impl UniqueConnec { *inner = pending; }, UniqueConnecInner::Full { on_clear, .. } => { + // TODO: Should we really replace the `Full` with an `Empty` here? What about + // letting dropping the future clear the connection automatically? Otherwise + // it is possible that the user dials before the future gets dropped, in which + // case the future dropping will set the value to `Empty`. But on the other hand, + // it is expected that `clear()` is instantaneous and if it is followed with + // `dial()` then it should dial. let _ = on_clear.send(()); }, }; @@ -242,7 +325,23 @@ impl Default for UniqueConnec { } } -/// Future returned by `UniqueConnec::get()`. +impl Drop for UniqueConnec { + fn drop(&mut self) { + // Notify the waiting futures if we are the last `UniqueConnec`. + if let Some(inner) = Arc::get_mut(&mut self.inner) { + match *inner.get_mut() { + UniqueConnecInner::Pending { ref mut tasks_waiting, .. } => { + for task in tasks_waiting.drain() { + task.1.notify(); + } + }, + _ => () + } + } + } +} + +/// Future returned by `UniqueConnec::dial()`. pub struct UniqueConnecFuture { inner: Weak>>, } @@ -263,28 +362,32 @@ impl Future for UniqueConnecFuture let mut inner = inner.lock(); match mem::replace(&mut *inner, UniqueConnecInner::Empty) { UniqueConnecInner::Empty => { - // This can happen if `set_until()` is called, and the future expires before the - // future returned by `get()` gets polled. This means that the connection has been + // This can happen if `tie_*()` is called, and the future expires before the + // future returned by `dial()` gets polled. This means that the connection has been // closed. Err(IoErrorKind::ConnectionAborted.into()) }, - UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut } => { + UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut, interrupt } => { match dial_fut.poll() { Ok(Async::Ready(())) => { // This happens if we successfully dialed a remote, but the callback - // doesn't call `set_until`. This can be a logic error by the user, + // doesn't call `tie_*`. This can be a logic error by the user, // but could also indicate that the user decided to filter out this // connection for whatever reason. *inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into()); Err(IoErrorKind::ConnectionAborted.into()) }, Ok(Async::NotReady) => { - tasks_waiting.push(task::current()); - *inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut }; + static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); + task_local! { + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) + } + tasks_waiting.insert(TASK_ID.with(|&k| k), task::current()); + *inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut, interrupt }; Ok(Async::NotReady) } Err(err) => { - let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string()); + let tr = IoError::new(err.kind(), err.to_string()); *inner = UniqueConnecInner::Errored(err); Err(tr) }, @@ -298,7 +401,7 @@ impl Future for UniqueConnecFuture Ok(Async::Ready(value)) }, UniqueConnecInner::Errored(err) => { - let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string()); + let tr = IoError::new(err.kind(), err.to_string()); *inner = UniqueConnecInner::Errored(err); Err(tr) }, @@ -311,35 +414,321 @@ impl Future for UniqueConnecFuture pub enum UniqueConnecState { /// The object is empty. Empty, - /// `get_*` has been called and we are waiting for `set_until` to be called. + /// `dial` has been called and we are waiting for `tie_*` to be called. Pending, - /// `set_until` has been called. + /// `tie_*` has been called. Full, - /// The future returned by the closure of `get_*` has errored or has finished before - /// `set_until` has been called. + /// The future returned by the closure of `dial` has errored or has finished before + /// `tie_*` has been called. Errored, } #[cfg(test)] mod tests { - use futures::{future, Future}; + use futures::{future, sync::oneshot, Future}; use transport::DeniedTransport; + use std::io::Error as IoError; + use std::sync::{Arc, atomic}; + use std::time::Duration; use {UniqueConnec, UniqueConnecState}; - use swarm; + use {swarm, transport, Transport}; + use tokio::runtime::current_thread; + use tokio_timer; + + #[test] + fn basic_working() { + // Checks the basic working of the `UniqueConnec`. + let (tx, rx) = transport::connector(); + let unique_connec = UniqueConnec::empty(); + let unique_connec2 = unique_connec.clone(); + assert_eq!(unique_connec.state(), UniqueConnecState::Empty); + + let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| { + // Note that this handles both the dial and the listen. + assert!(unique_connec2.is_alive()); + unique_connec2.tie_or_stop(12, future::empty()) + }); + swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); + + let dial_success = unique_connec + .dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx) + .map(|val| { assert_eq!(val, 12); }); + assert_eq!(unique_connec.state(), UniqueConnecState::Pending); + + let future = dial_success.select(swarm_future).map_err(|(err, _)| err); + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert_eq!(unique_connec.state(), UniqueConnecState::Full); + } #[test] fn invalid_multiaddr_produces_error() { + // Tests that passing an invalid multiaddress generates an error. let unique = UniqueConnec::empty(); assert_eq!(unique.state(), UniqueConnecState::Empty); let unique2 = unique.clone(); let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| { - unique2.set_until((), future::empty()) + unique2.tie_or_stop((), future::empty()) }); - let fut = unique.get_or_dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(), - DeniedTransport); + let fut = unique.dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(), DeniedTransport); assert!(fut.wait().is_err()); assert_eq!(unique.state(), UniqueConnecState::Errored); } - // TODO: more tests + #[test] + fn tie_or_stop_stops() { + // Tests that `tie_or_stop` destroys additional futures passed to it. + let (tx, rx) = transport::connector(); + let unique_connec = UniqueConnec::empty(); + let unique_connec2 = unique_connec.clone(); + + // This channel is used to detect whether the future has been dropped. + let (msg_tx, msg_rx) = oneshot::channel(); + + let mut num_connec = 0; + let mut msg_rx = Some(msg_rx); + let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { + num_connec += 1; + if num_connec == 1 { + unique_connec2.tie_or_stop(12, future::Either::A(future::empty())) + } else { + let fut = msg_rx.take().unwrap().map_err(|_| panic!()); + unique_connec2.tie_or_stop(13, future::Either::B(fut)) + } + }); + swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); + + let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { + future::empty() + }); + + let dial_success = unique_connec + .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx.clone()) + .map(|val| { assert_eq!(val, 12); }) + .inspect({ + let c = unique_connec.clone(); + move |_| { assert!(c.is_alive()); } + }) + .and_then(|_| { + tokio_timer::sleep(Duration::from_secs(1)) + .map_err(|_| unreachable!()) + }) + .and_then(move |_| { + swarm_ctrl2.dial("/memory".parse().unwrap(), tx) + .unwrap_or_else(|_| panic!()) + }) + .inspect({ + let c = unique_connec.clone(); + move |_| { + assert_eq!(c.poll(), Some(12)); // Not 13 + assert!(msg_tx.send(()).is_err()); + } + }); + + let future = dial_success + .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err); + + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert!(unique_connec.is_alive()); + } + + #[test] + fn tie_or_passthrough_passes_through() { + // Tests that `tie_or_passthrough` doesn't delete additional futures passed to it when + // it is already full, and doesn't gets its value modified when that happens. + let (tx, rx) = transport::connector(); + let unique_connec = UniqueConnec::empty(); + let unique_connec2 = unique_connec.clone(); + + let mut num = 12; + let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| { + // Note that this handles both the dial and the listen. + let fut = future::empty().then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() }); + num += 1; + unique_connec2.tie_or_passthrough(num, fut) + }); + swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); + + let dial_success = unique_connec + .dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx.clone()) + .map(|val| { assert_eq!(val, 13); }); + + swarm_ctrl.dial("/memory".parse().unwrap(), tx) + .unwrap(); + + let future = dial_success.select(swarm_future).map_err(|(err, _)| err); + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert_eq!(unique_connec.poll(), Some(13)); + } + + #[test] + fn cleared_when_future_drops() { + // Tests that the `UniqueConnec` gets cleared when the future we associate with it gets + // destroyed. + let (tx, rx) = transport::connector(); + let unique_connec = UniqueConnec::empty(); + let unique_connec2 = unique_connec.clone(); + + let (msg_tx, msg_rx) = oneshot::channel(); + let mut msg_rx = Some(msg_rx); + + let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { + future::empty() + }); + swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); + + let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { + let fut = msg_rx.take().unwrap().map_err(|_| -> IoError { unreachable!() }); + unique_connec2.tie_or_stop(12, fut) + }); + + let dial_success = unique_connec + .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx) + .map(|val| { assert_eq!(val, 12); }) + .inspect({ + let c = unique_connec.clone(); + move |_| { assert!(c.is_alive()); } + }) + .and_then(|_| { + msg_tx.send(()).unwrap(); + tokio_timer::sleep(Duration::from_secs(1)) + .map_err(|_| unreachable!()) + }) + .inspect({ + let c = unique_connec.clone(); + move |_| { assert!(!c.is_alive()); } + }); + + let future = dial_success + .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err); + + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert!(!unique_connec.is_alive()); + } + + #[test] + fn future_drops_when_cleared() { + // Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get cleared. + let (tx, rx) = transport::connector(); + let unique_connec = UniqueConnec::empty(); + let unique_connec2 = unique_connec.clone(); + + let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { + future::empty() + }); + swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); + + let finished = Arc::new(atomic::AtomicBool::new(false)); + let finished2 = finished.clone(); + let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { + let finished2 = finished2.clone(); + unique_connec2.tie_or_stop(12, future::empty()).then(move |v| { + finished2.store(true, atomic::Ordering::Relaxed); + v + }) + }); + + let dial_success = unique_connec + .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx) + .map(|val| { assert_eq!(val, 12); }) + .inspect({ + let c = unique_connec.clone(); + move |_| { + assert!(c.is_alive()); + c.clear(); + assert!(!c.is_alive()); + } + }) + .and_then(|_| { + tokio_timer::sleep(Duration::from_secs(1)) + .map_err(|_| unreachable!()) + }) + .inspect({ + let c = unique_connec.clone(); + move |_| { + assert!(finished.load(atomic::Ordering::Relaxed)); + assert!(!c.is_alive()); + } + }); + + let future = dial_success + .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err); + + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + assert!(!unique_connec.is_alive()); + } + + #[test] + fn future_drops_when_destroyed() { + // Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get dropped. + let (tx, rx) = transport::connector(); + let unique_connec = UniqueConnec::empty(); + let mut unique_connec2 = Some(unique_connec.clone()); + + let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| { + future::empty() + }); + swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap(); + + let finished = Arc::new(atomic::AtomicBool::new(false)); + let finished2 = finished.clone(); + let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| { + let finished2 = finished2.clone(); + unique_connec2.take().unwrap().tie_or_stop(12, future::empty()).then(move |v| { + finished2.store(true, atomic::Ordering::Relaxed); + v + }) + }); + + let dial_success = unique_connec + .dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx) + .map(|val| { assert_eq!(val, 12); }) + .inspect(move |_| { + assert!(unique_connec.is_alive()); + drop(unique_connec); + }) + .and_then(|_| { + tokio_timer::sleep(Duration::from_secs(1)) + .map_err(|_| unreachable!()) + }) + .inspect(move |_| { + assert!(finished.load(atomic::Ordering::Relaxed)); + }); + + let future = dial_success + .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err); + + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + } + + #[test] + fn error_if_unique_connec_destroyed_before_future() { + // Tests that the future returned by `dial` returns an error if the `UniqueConnec` no + // longer exists. + let (tx, rx) = transport::connector(); + + let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| { + future::empty() + }); + swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); + + let unique_connec = UniqueConnec::empty(); + let dial_success = unique_connec + .dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx) + .then(|val: Result<(), IoError>| { + assert!(val.is_err()); + Ok(()) + }); + drop(unique_connec); + + let future = dial_success + .select(swarm_future).map(|_| ()).map_err(|(err, _)| err); + current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + } + + // TODO: test that dialing is interrupted when UniqueConnec is cleared + // TODO: test that dialing is interrupted when UniqueConnec is dropped } diff --git a/libp2p/examples/kademlia.rs b/libp2p/examples/kademlia.rs index 9720d9c9..29dfa22c 100644 --- a/libp2p/examples/kademlia.rs +++ b/libp2p/examples/kademlia.rs @@ -176,7 +176,7 @@ fn main() { active_kad_connections .entry(node_id) .or_insert_with(Default::default) - .set_until(kad_ctrl, fut) + .tie_or_passthrough(kad_ctrl, fut) }) } } @@ -195,7 +195,7 @@ fn main() { let addr = Multiaddr::from(libp2p::multiaddr::AddrComponent::P2P(cid)); active_kad_connections.lock().unwrap().entry(peer.clone()) .or_insert_with(Default::default) - .get_or_dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new())) + .dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new())) }) .filter_map(move |event| { match event {