Remove two unused files (#567)

This commit is contained in:
Pierre Krieger
2018-10-17 12:26:42 +01:00
committed by GitHub
parent 5d1c54cc10
commit 840663e181
2 changed files with 0 additions and 1328 deletions

View File

@ -1,569 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::stream::StreamFuture;
use futures::sync::oneshot;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures::task;
use parking_lot::Mutex;
use std::fmt;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use {Multiaddr, MuxedTransport, Transport};
/// Creates a swarm.
///
/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a
/// `Future` that produces a `()`.
///
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
/// control, and the `Future` must be driven to completion in order for things to work.
pub fn swarm<T, H, F>(
transport: T,
handler: H,
) -> (SwarmController<T, F::Future>, SwarmEvents<T, F::Future, H>)
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let shared = Arc::new(Mutex::new(Shared {
next_incoming: transport.clone().next_incoming(),
listeners: Vec::new(),
listeners_upgrade: Vec::new(),
dialers: Vec::new(),
to_process: Vec::new(),
task_to_notify: None,
}));
let future = SwarmEvents {
transport: transport.clone(),
shared: shared.clone(),
handler: handler,
};
let controller = SwarmController {
transport,
shared,
};
(controller, future)
}
/// Allows control of what the swarm is doing.
pub struct SwarmController<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
/// Shared between the swarm infrastructure.
shared: Arc<Mutex<Shared<T, F>>>,
/// Transport used to dial or listen.
transport: T,
}
impl<T, F> fmt::Debug for SwarmController<T, F>
where
T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SwarmController")
.field(&self.transport)
.finish()
}
}
impl<T, F> Clone for SwarmController<T, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
{
fn clone(&self) -> Self {
SwarmController {
transport: self.transport.clone(),
shared: self.shared.clone(),
}
}
}
impl<T, F> SwarmController<T, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Output: Send,
F: 'static,
{
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`.
///
/// Returns a future that is signalled once the closure in the `swarm` has returned its future.
/// Therefore if the closure in the swarm has some side effect (eg. write something in a
/// variable), this side effect will be observable when this future succeeds.
#[inline]
pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du)
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Dial: Send,
Du::MultiaddrFuture: Send,
Du::Output: Into<T::Output>,
{
self.dial_then(multiaddr, transport, |v| v)
}
/// Internal version of `dial` that allows adding a closure that is called after either the
/// dialing fails or the handler has been called with the resulting future.
///
/// The returned future is filled with the output of `then`.
pub(crate) fn dial_then<Du, TThen>(&self, multiaddr: Multiaddr, transport: Du, then: TThen)
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Dial: Send,
Du::MultiaddrFuture: Send,
Du::Output: Into<T::Output>,
TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + Send + 'static,
{
trace!("Swarm dialing {}", multiaddr);
match transport.dial(multiaddr.clone()) {
Ok(dial) => {
let (tx, rx) = oneshot::channel();
let mut then = Some(move |val| {
let _ = tx.send(then(val));
});
// Unfortunately the `Box<FnOnce(_)>` type is still unusable in Rust right now,
// so we use a `Box<FnMut(_)>` instead and panic if it is called multiple times.
let mut then = Box::new(move |val: Result<(), IoError>| {
let then = then.take().expect("The Boxed FnMut should only be called once");
then(val);
}) as Box<FnMut(_) + Send>;
let dial = dial.then(|result| {
match result {
Ok((output, client_addr)) => {
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _> + Send>;
Ok((output.into(), then, client_addr))
}
Err(err) => {
debug!("Error in dialer upgrade: {:?}", err);
let err_clone = IoError::new(err.kind(), err.to_string());
then(Err(err));
Err(err_clone)
}
}
});
let mut shared = self.shared.lock();
shared.dialers.push((multiaddr, Box::new(dial) as Box<_>));
if let Some(task) = shared.task_to_notify.take() {
task.notify();
}
Ok(rx.then(|result| {
match result {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(_) => Err(IoError::new(IoErrorKind::ConnectionAborted,
"dial cancelled the swarm future has been destroyed")),
}
}))
}
Err((_, multiaddr)) => Err(multiaddr),
}
}
/// Interrupts all dialing attempts to a specific multiaddress.
///
/// Has no effect if the dialing attempt has already succeeded, in which case it will be
/// dispatched to the handler.
pub fn interrupt_dial(&self, multiaddr: &Multiaddr) {
let mut shared = self.shared.lock();
shared.dialers.retain(|dialer| {
&dialer.0 != multiaddr
});
}
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
/// was passed to `swarm`.
// TODO: add a way to cancel a listener
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.transport.clone().listen_on(multiaddr) {
Ok((listener, new_addr)) => {
trace!("Swarm listening on {}", new_addr);
let mut shared = self.shared.lock();
let listener = Box::new(
listener.map(|f| {
let f = f.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError> + Send>)
});
Box::new(f) as Box<Future<Item = _, Error = _> + Send>
}),
) as Box<Stream<Item = _, Error = _> + Send>;
shared.listeners.push((new_addr.clone(), listener.into_future()));
if let Some(task) = shared.task_to_notify.take() {
task.notify();
}
Ok(new_addr)
}
Err((_, multiaddr)) => Err(multiaddr),
}
}
}
/// Future that must be driven to completion in order for the swarm to work.
#[must_use = "futures do nothing unless polled"]
pub struct SwarmEvents<T, F, H>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
/// Shared between the swarm infrastructure.
shared: Arc<Mutex<Shared<T, F>>>,
/// The transport used to dial.
transport: T,
/// Swarm handler.
handler: H,
}
impl<T, H, If, F> Stream for SwarmEvents<T, F, H>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
T::MultiaddrFuture: Send,
T::IncomingUpgrade: Send,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
type Item = SwarmEvent<F>;
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut shared = self.shared.lock();
let handler = &mut self.handler;
loop {
match shared.next_incoming.poll() {
Ok(Async::Ready(connec)) => {
debug!("Swarm received new multiplexed incoming connection");
shared.next_incoming = self.transport.clone().next_incoming();
let connec = connec.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError> + Send>)
});
shared.listeners_upgrade.push(Box::new(connec) as Box<_>);
}
Ok(Async::NotReady) => break,
Err(err) => {
// TODO: should that stop everything?
debug!("Error in multiplexed incoming connection: {:?}", err);
shared.next_incoming = self.transport.clone().next_incoming();
return Ok(Async::Ready(Some(SwarmEvent::IncomingError(err))));
}
}
}
// We remove each element from `shared.listeners` one by one and add them back only
// if relevant.
for n in (0 .. shared.listeners.len()).rev() {
let (listen_addr, mut listener) = shared.listeners.swap_remove(n);
loop {
match listener.poll() {
Ok(Async::Ready((Some(upgrade), remaining))) => {
trace!("Swarm received new connection on listener socket");
shared.listeners_upgrade.push(upgrade);
listener = remaining.into_future();
}
Ok(Async::Ready((None, _))) => {
debug!("Listener closed gracefully");
return Ok(Async::Ready(Some(SwarmEvent::ListenerClosed {
listen_addr
})));
},
Err((error, _)) => {
debug!("Error in listener: {:?}", error);
return Ok(Async::Ready(Some(SwarmEvent::ListenerError {
listen_addr,
error,
})));
}
Ok(Async::NotReady) => {
shared.listeners.push((listen_addr, listener));
break;
}
}
}
}
// We remove each element from `shared.listeners_upgrade` one by one and add them back
// only if relevant.
for n in (0 .. shared.listeners_upgrade.len()).rev() {
let mut listener_upgrade = shared.listeners_upgrade.swap_remove(n);
match listener_upgrade.poll() {
Ok(Async::Ready((output, client_addr))) => {
debug!("Successfully upgraded incoming connection");
// TODO: unlock mutex before calling handler, in order to avoid deadlocks if
// the user does something stupid
shared.to_process.push(handler(output, client_addr).into_future());
}
Err(err) => {
debug!("Error in listener upgrade: {:?}", err);
return Ok(Async::Ready(Some(SwarmEvent::ListenerUpgradeError(err))));
}
Ok(Async::NotReady) => {
shared.listeners_upgrade.push(listener_upgrade);
},
}
}
// We remove each element from `shared.dialers` one by one and add them back only
// if relevant.
for n in (0 .. shared.dialers.len()).rev() {
let (client_addr, mut dialer) = shared.dialers.swap_remove(n);
match dialer.poll() {
Ok(Async::Ready((output, mut notifier, addr))) => {
trace!("Successfully upgraded dialed connection");
// TODO: unlock mutex before calling handler, in order to avoid deadlocks if
// the user does something stupid
shared.to_process.push(handler(output, addr).into_future());
notifier(Ok(()));
}
Err(error) => {
return Ok(Async::Ready(Some(SwarmEvent::DialFailed {
client_addr,
error,
})));
},
Ok(Async::NotReady) => {
shared.dialers.push((client_addr, dialer));
},
}
}
// We remove each element from `shared.to_process` one by one and add them back only
// if relevant.
for n in (0 .. shared.to_process.len()).rev() {
let mut to_process = shared.to_process.swap_remove(n);
match to_process.poll() {
Ok(Async::Ready(())) => {
trace!("Future returned by swarm handler driven to completion");
return Ok(Async::Ready(Some(SwarmEvent::HandlerFinished {
handler_future: to_process,
})));
}
Err(error) => {
debug!("Error in processing: {:?}", error);
return Ok(Async::Ready(Some(SwarmEvent::HandlerError {
handler_future: to_process,
error,
})));
}
Ok(Async::NotReady) => {
shared.to_process.push(to_process);
}
}
}
// TODO: we never return `Ok(Ready)` because there's no way to know whether
// `next_incoming()` can produce anything more in the future ; also we would need to
// know when the controller has been dropped
shared.task_to_notify = Some(task::current());
Ok(Async::NotReady)
}
}
// TODO: stronger typing
struct Shared<T, F> where T: MuxedTransport + 'static {
/// Next incoming substream on the transport.
next_incoming: T::Incoming,
/// All the active listeners.
listeners: Vec<(
Multiaddr,
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>,
Error = IoError,
> + Send,
>,
>,
)>,
/// Futures that upgrade an incoming listening connection to a full connection.
listeners_upgrade:
Vec<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>>,
/// Futures that dial a remote address.
///
/// Contains the address we dial, so that we can cancel it if necessary.
dialers: Vec<(Multiaddr, Box<Future<Item = (T::Output, Box<FnMut(Result<(), IoError>) + Send>, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>)>,
/// List of futures produced by the swarm closure. Must be processed to the end.
to_process: Vec<F>,
/// The task to notify whenever we add a new element in one of the lists.
/// Necessary so that the task wakes up and the element gets polled.
task_to_notify: Option<task::Task>,
}
/// Event that happens in the swarm.
#[derive(Debug)]
pub enum SwarmEvent<F> {
/// An error has happened while polling the muxed transport for incoming connections.
IncomingError(IoError),
/// A listener has gracefully closed.
ListenerClosed {
/// Address the listener was listening on.
listen_addr: Multiaddr,
},
/// A listener has stopped because it produced an error.
ListenerError {
/// Address the listener was listening on.
listen_addr: Multiaddr,
/// The error that happened.
error: IoError,
},
/// An error happened while upgrading an incoming connection.
ListenerUpgradeError(IoError),
/// Failed to dial a remote address.
DialFailed {
/// Address we were trying to dial.
client_addr: Multiaddr,
/// Error that happened.
error: IoError,
},
/// A future returned by the handler has finished.
HandlerFinished {
/// The future originally returned by the handler.
handler_future: F,
},
/// A future returned by the handler has produced an error.
HandlerError {
/// The future originally returned by the handler.
handler_future: F,
/// The error that happened.
error: IoError,
},
}
#[cfg(test)]
mod tests {
use futures::{Future, Stream, future};
use rand;
use transport::{self, DeniedTransport, Transport};
use std::io::Error as IoError;
use std::sync::{atomic, Arc};
use swarm;
use tokio::runtime::current_thread;
#[test]
fn transport_error_propagation_listen() {
let (swarm_ctrl, _swarm_future) = swarm(DeniedTransport, |_, _| future::empty());
assert!(swarm_ctrl.listen_on("/ip4/127.0.0.1/tcp/10000".parse().unwrap()).is_err());
}
#[test]
fn transport_error_propagation_dial() {
let (swarm_ctrl, _swarm_future) = swarm(DeniedTransport, |_, _| future::empty());
let addr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap();
assert!(swarm_ctrl.dial(addr, DeniedTransport).is_err());
}
#[test]
fn basic_dial() {
let (tx, rx) = transport::connector();
let reached_tx = Arc::new(atomic::AtomicBool::new(false));
let reached_tx2 = reached_tx.clone();
let reached_rx = Arc::new(atomic::AtomicBool::new(false));
let reached_rx2 = reached_rx.clone();
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), |_, _| {
reached_rx2.store(true, atomic::Ordering::SeqCst);
future::empty()
});
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), |_, _| {
reached_tx2.store(true, atomic::Ordering::SeqCst);
future::empty()
});
let dial_success = swarm_ctrl2.dial("/memory".parse().unwrap(), tx).unwrap();
let future = swarm_future2.for_each(|_| Ok(()))
.select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)
.select(dial_success).map(|_| ()).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert!(reached_tx.load(atomic::Ordering::SeqCst));
assert!(reached_rx.load(atomic::Ordering::SeqCst));
}
#[test]
fn dial_multiple_times() {
let (tx, rx) = transport::connector();
let reached = Arc::new(atomic::AtomicUsize::new(0));
let reached2 = reached.clone();
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| {
reached2.fetch_add(1, atomic::Ordering::SeqCst);
future::empty()
});
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
let num_dials = 20000 + rand::random::<usize>() % 20000;
let mut dials = Vec::new();
for _ in 0 .. num_dials {
let f = swarm_ctrl.dial("/memory".parse().unwrap(), tx.clone()).unwrap();
dials.push(f);
}
let future = future::join_all(dials)
.map(|_| ())
.select(swarm_future.for_each(|_| Ok(())))
.map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert_eq!(reached.load(atomic::Ordering::SeqCst), num_dials);
}
#[test]
fn future_isnt_dropped() {
// Tests that the future in the closure isn't being dropped.
let (tx, rx) = transport::connector();
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| {
future::empty()
.then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() }) // <-- the test
});
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
let dial_success = swarm_ctrl.dial("/memory".parse().unwrap(), tx).unwrap();
let future = dial_success.select(swarm_future.for_each(|_| Ok(())))
.map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
}
}

View File

@ -1,759 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use fnv::FnvHashMap;
use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture};
use parking_lot::Mutex;
use {Multiaddr, MuxedTransport, SwarmController, Transport};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::sync::{Arc, Weak, atomic::AtomicUsize, atomic::Ordering};
use transport::interruptible::Interrupt;
/// Storage for a unique connection with a remote.
pub struct UniqueConnec<T> {
inner: Arc<Mutex<UniqueConnecInner<T>>>,
}
enum UniqueConnecInner<T> {
/// The `UniqueConnec` was created, but nothing is in it.
Empty,
/// We started dialing, but no response has been obtained so far.
Pending {
/// Tasks that need to be awakened when the content of this object is set.
tasks_waiting: FnvHashMap<usize, task::Task>,
/// Future that represents when `tie_*` should have been called.
// TODO: Send + Sync bound is meh
dial_fut: Box<Future<Item = (), Error = IoError> + Send + Sync>,
/// Dropping this object will automatically interrupt the dial, which is very useful if
/// we clear or drop the `UniqueConnec`.
interrupt: Interrupt,
},
/// The value of this unique connec has been set.
/// Can only transition to `Empty` when the future has expired.
Full {
/// Content of the object.
value: T,
/// Sender to trigger if the content gets cleared.
on_clear: oneshot::Sender<()>,
},
/// The `dial_fut` has errored.
Errored(IoError),
}
impl<T> UniqueConnec<T> {
/// Builds a new empty `UniqueConnec`.
#[inline]
pub fn empty() -> Self {
UniqueConnec {
inner: Arc::new(Mutex::new(UniqueConnecInner::Empty)),
}
}
/// Builds a new `UniqueConnec` that contains a value.
#[inline]
pub fn with_value(value: T) -> Self {
let (on_clear, _) = oneshot::channel();
UniqueConnec {
inner: Arc::new(Mutex::new(UniqueConnecInner::Full { value, on_clear })),
}
}
/// Instantly returns the value from the object if there is any.
pub fn poll(&self) -> Option<T>
where T: Clone,
{
let inner = self.inner.lock();
if let UniqueConnecInner::Full { ref value, .. } = &*inner {
Some(value.clone())
} else {
None
}
}
/// Loads the value from the object.
///
/// If the object is empty or has errored earlier, dials the given multiaddress with the
/// given transport.
///
/// The closure of the `swarm` is expected to call `tie_*()` on the `UniqueConnec`. Failure
/// to do so will make the `UniqueConnecFuture` produce an error.
///
/// One critical property of this method, is that if a connection incomes and `tie_*` is
/// called, then it will be returned by the returned future.
#[inline]
pub fn dial<S, F, Du>(&self, swarm: &SwarmController<S, F>, multiaddr: &Multiaddr,
transport: Du) -> UniqueConnecFuture<T>
where T: Clone + Send + 'static, // TODO: 'static :-/
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
Du::Dial: Send,
Du::MultiaddrFuture: Send,
S: Clone + MuxedTransport,
S::Dial: Send,
S::Listener: Send,
S::ListenerUpgrade: Send,
S::Output: Send,
S::MultiaddrFuture: Send,
F: 'static,
{
self.dial_inner(swarm, multiaddr, transport, true)
}
/// Same as `dial`, except that the future will produce an error if an earlier attempt to dial
/// has errored.
#[inline]
pub fn dial_if_empty<S, F, Du>(&self, swarm: &SwarmController<S, F>, multiaddr: &Multiaddr,
transport: Du) -> UniqueConnecFuture<T>
where T: Clone + Send + 'static, // TODO: 'static :-/
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
Du::Dial: Send,
Du::MultiaddrFuture: Send,
S: Clone + MuxedTransport,
S::Dial: Send,
S::Listener: Send,
S::ListenerUpgrade: Send,
S::Output: Send,
S::MultiaddrFuture: Send,
F: 'static,
{
self.dial_inner(swarm, multiaddr, transport, false)
}
/// Inner implementation of `dial_*`.
fn dial_inner<S, F, Du>(&self, swarm: &SwarmController<S, F>, multiaddr: &Multiaddr,
transport: Du, dial_if_err: bool) -> UniqueConnecFuture<T>
where T: Clone + Send + 'static, // TODO: 'static :-/
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
Du::Dial: Send,
Du::MultiaddrFuture: Send,
S: Clone + MuxedTransport,
S::Dial: Send,
S::Listener: Send,
S::ListenerUpgrade: Send,
S::Output: Send,
S::MultiaddrFuture: Send,
F: 'static,
{
let mut inner = self.inner.lock();
match &*inner {
UniqueConnecInner::Empty => (),
UniqueConnecInner::Errored(_) if dial_if_err => (),
_ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) },
};
let weak_inner = Arc::downgrade(&self.inner);
let (transport, interrupt) = transport.interruptible();
let dial_fut = swarm.dial_then(multiaddr.clone(), transport,
move |val: Result<(), IoError>| {
let inner = match weak_inner.upgrade() {
Some(i) => i,
None => return val
};
let mut inner = inner.lock();
if let UniqueConnecInner::Full { .. } = *inner {
return val;
}
let new_val = UniqueConnecInner::Errored(match val {
Ok(()) => IoError::new(IoErrorKind::ConnectionRefused,
"dialing has succeeded but tie_* hasn't been called"),
Err(ref err) => IoError::new(err.kind(), err.to_string()),
});
match mem::replace(&mut *inner, new_val) {
UniqueConnecInner::Pending { tasks_waiting, .. } => {
for task in tasks_waiting {
task.1.notify();
}
},
_ => ()
};
val
});
let dial_fut = dial_fut
.map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported"))
.into_future()
.flatten();
*inner = UniqueConnecInner::Pending {
tasks_waiting: Default::default(),
dial_fut: Box::new(dial_fut),
interrupt,
};
UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }
}
/// Puts `value` inside the object.
/// Additionally, the `UniqueConnec` will be tied to the `until` future. When the future drops
/// or finishes, the `UniqueConnec` is automatically cleared. If the `UniqueConnec` is cleared
/// by the user, the future automatically stops.
/// The returned future is an adjusted version of that same future.
///
/// If the object already contains something, then `until` is dropped and a dummy future that
/// immediately ends is returned.
pub fn tie_or_stop<F>(&self, value: T, until: F) -> impl Future<Item = (), Error = F::Error>
where F: Future<Item = ()>
{
self.tie_inner(value, until, false)
}
/// Same as `tie_or_stop`, except that is if the object already contains something, then
/// `until` is returned immediately and can live in parallel.
pub fn tie_or_passthrough<F>(&self, value: T, until: F) -> impl Future<Item = (), Error = F::Error>
where F: Future<Item = ()>
{
self.tie_inner(value, until, true)
}
/// Inner implementation of `tie_*`.
fn tie_inner<F>(&self, value: T, until: F, pass_through: bool) -> impl Future<Item = (), Error = F::Error>
where F: Future<Item = ()>
{
let mut tasks_to_notify = Default::default();
let mut inner = self.inner.lock();
let (on_clear, on_clear_rx) = oneshot::channel();
match mem::replace(&mut *inner, UniqueConnecInner::Full { value, on_clear }) {
UniqueConnecInner::Empty => {},
UniqueConnecInner::Errored(_) => {},
UniqueConnecInner::Pending { tasks_waiting, .. } => {
tasks_to_notify = tasks_waiting;
},
old @ UniqueConnecInner::Full { .. } => {
// Keep the old value.
*inner = old;
if pass_through {
return future::Either::B(future::Either::A(until));
} else {
return future::Either::B(future::Either::B(future::ok(())));
}
},
};
drop(inner);
struct Cleaner<T>(Weak<Mutex<UniqueConnecInner<T>>>);
impl<T> Drop for Cleaner<T> {
#[inline]
fn drop(&mut self) {
if let Some(inner) = self.0.upgrade() {
*inner.lock() = UniqueConnecInner::Empty;
}
}
}
let cleaner = Cleaner(Arc::downgrade(&self.inner));
// The mutex is unlocked when we notify the pending tasks.
for task in tasks_to_notify {
task.1.notify();
}
let fut = until
.select(on_clear_rx.then(|_| Ok(())))
.map(|((), _)| ())
.map_err(|(err, _)| err)
.then(move |val| {
drop(cleaner); // Make sure that `cleaner` gets called there.
val
});
future::Either::A(fut)
}
/// Clears the content of the object.
///
/// Has no effect if the content is empty or pending.
/// If the node was full, calling `clear` will stop the future returned by `tie_*`.
pub fn clear(&self) {
let mut inner = self.inner.lock();
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
UniqueConnecInner::Empty => {},
UniqueConnecInner::Errored(_) => {},
pending @ UniqueConnecInner::Pending { .. } => {
*inner = pending;
},
UniqueConnecInner::Full { on_clear, .. } => {
// TODO: Should we really replace the `Full` with an `Empty` here? What about
// letting dropping the future clear the connection automatically? Otherwise
// it is possible that the user dials before the future gets dropped, in which
// case the future dropping will set the value to `Empty`. But on the other hand,
// it is expected that `clear()` is instantaneous and if it is followed with
// `dial()` then it should dial.
let _ = on_clear.send(());
},
};
}
/// Returns the state of the object.
///
/// Note that this can be racy, as the object can be used at the same time. In other words,
/// the returned value may no longer reflect the actual state.
pub fn state(&self) -> UniqueConnecState {
match *self.inner.lock() {
UniqueConnecInner::Empty => UniqueConnecState::Empty,
UniqueConnecInner::Errored(_) => UniqueConnecState::Errored,
UniqueConnecInner::Pending { .. } => UniqueConnecState::Pending,
UniqueConnecInner::Full { .. } => UniqueConnecState::Full,
}
}
/// Returns true if the object has a pending or active connection. Returns false if the object
/// is empty or the connection has errored earlier.
#[inline]
pub fn is_alive(&self) -> bool {
match self.state() {
UniqueConnecState::Empty => false,
UniqueConnecState::Errored => false,
UniqueConnecState::Pending => true,
UniqueConnecState::Full => true,
}
}
}
impl<T> Clone for UniqueConnec<T> {
#[inline]
fn clone(&self) -> UniqueConnec<T> {
UniqueConnec {
inner: self.inner.clone(),
}
}
}
impl<T> Default for UniqueConnec<T> {
#[inline]
fn default() -> Self {
UniqueConnec::empty()
}
}
impl<T> Drop for UniqueConnec<T> {
fn drop(&mut self) {
// Notify the waiting futures if we are the last `UniqueConnec`.
if let Some(inner) = Arc::get_mut(&mut self.inner) {
match *inner.get_mut() {
UniqueConnecInner::Pending { ref mut tasks_waiting, .. } => {
for task in tasks_waiting.drain() {
task.1.notify();
}
},
_ => ()
}
}
}
}
/// Future returned by `UniqueConnec::dial()`.
#[must_use = "futures do nothing unless polled"]
pub struct UniqueConnecFuture<T> {
inner: Weak<Mutex<UniqueConnecInner<T>>>,
}
impl<T> Future for UniqueConnecFuture<T>
where T: Clone
{
type Item = T;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
// All the `UniqueConnec` have been destroyed.
None => return Err(IoErrorKind::ConnectionAborted.into()),
};
let mut inner = inner.lock();
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
UniqueConnecInner::Empty => {
// This can happen if `tie_*()` is called, and the future expires before the
// future returned by `dial()` gets polled. This means that the connection has been
// closed.
Err(IoErrorKind::ConnectionAborted.into())
},
UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut, interrupt } => {
match dial_fut.poll() {
Ok(Async::Ready(())) => {
// This happens if we successfully dialed a remote, but the callback
// doesn't call `tie_*`. This can be a logic error by the user,
// but could also indicate that the user decided to filter out this
// connection for whatever reason.
*inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into());
Err(IoErrorKind::ConnectionAborted.into())
},
Ok(Async::NotReady) => {
static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0);
task_local! {
static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed)
}
tasks_waiting.insert(TASK_ID.with(|&k| k), task::current());
*inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut, interrupt };
Ok(Async::NotReady)
}
Err(err) => {
let tr = IoError::new(err.kind(), err.to_string());
*inner = UniqueConnecInner::Errored(err);
Err(tr)
},
}
},
UniqueConnecInner::Full { value, on_clear } => {
*inner = UniqueConnecInner::Full {
value: value.clone(),
on_clear
};
Ok(Async::Ready(value))
},
UniqueConnecInner::Errored(err) => {
let tr = IoError::new(err.kind(), err.to_string());
*inner = UniqueConnecInner::Errored(err);
Err(tr)
},
}
}
}
/// State of a `UniqueConnec`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum UniqueConnecState {
/// The object is empty.
Empty,
/// `dial` has been called and we are waiting for `tie_*` to be called.
Pending,
/// `tie_*` has been called.
Full,
/// The future returned by the closure of `dial` has errored or has finished before
/// `tie_*` has been called.
Errored,
}
#[cfg(test)]
mod tests {
use futures::{future, sync::oneshot, Future, Stream};
use transport::DeniedTransport;
use std::io::Error as IoError;
use std::sync::{Arc, atomic};
use std::time::Duration;
use {UniqueConnec, UniqueConnecState};
use {swarm, transport, Transport};
use tokio::runtime::current_thread;
use tokio_timer;
#[test]
fn basic_working() {
// Checks the basic working of the `UniqueConnec`.
let (tx, rx) = transport::connector();
let unique_connec = UniqueConnec::empty();
let unique_connec2 = unique_connec.clone();
assert_eq!(unique_connec.state(), UniqueConnecState::Empty);
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| {
// Note that this handles both the dial and the listen.
assert!(unique_connec2.is_alive());
unique_connec2.tie_or_stop(12, future::empty())
});
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
let dial_success = unique_connec
.dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx)
.map(|val| { assert_eq!(val, 12); });
assert_eq!(unique_connec.state(), UniqueConnecState::Pending);
let future = dial_success.select(swarm_future.for_each(|_| Ok(()))).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert_eq!(unique_connec.state(), UniqueConnecState::Full);
}
#[test]
fn invalid_multiaddr_produces_error() {
// Tests that passing an invalid multiaddress generates an error.
let unique = UniqueConnec::empty();
assert_eq!(unique.state(), UniqueConnecState::Empty);
let unique2 = unique.clone();
let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| {
unique2.tie_or_stop((), future::empty())
});
let fut = unique.dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(), DeniedTransport);
assert!(fut.wait().is_err());
assert_eq!(unique.state(), UniqueConnecState::Errored);
}
#[test]
fn tie_or_stop_stops() {
// Tests that `tie_or_stop` destroys additional futures passed to it.
let (tx, rx) = transport::connector();
let unique_connec = UniqueConnec::empty();
let unique_connec2 = unique_connec.clone();
// This channel is used to detect whether the future has been dropped.
let (msg_tx, msg_rx) = oneshot::channel();
let mut num_connec = 0;
let mut msg_rx = Some(msg_rx);
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
num_connec += 1;
if num_connec == 1 {
unique_connec2.tie_or_stop(12, future::Either::A(future::empty()))
} else {
let fut = msg_rx.take().unwrap().map_err(|_| panic!());
unique_connec2.tie_or_stop(13, future::Either::B(fut))
}
});
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
future::empty()
});
let dial_success = unique_connec
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx.clone())
.map(|val| { assert_eq!(val, 12); })
.inspect({
let c = unique_connec.clone();
move |_| { assert!(c.is_alive()); }
})
.and_then(|_| {
tokio_timer::sleep(Duration::from_secs(1))
.map_err(|_| unreachable!())
})
.and_then(move |_| {
swarm_ctrl2.dial("/memory".parse().unwrap(), tx)
.unwrap_or_else(|_| panic!())
})
.inspect({
let c = unique_connec.clone();
move |_| {
assert_eq!(c.poll(), Some(12)); // Not 13
assert!(msg_tx.send(()).is_err());
}
});
let future = dial_success
.select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)
.select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert!(unique_connec.is_alive());
}
#[test]
fn tie_or_passthrough_passes_through() {
// Tests that `tie_or_passthrough` doesn't delete additional futures passed to it when
// it is already full, and doesn't gets its value modified when that happens.
let (tx, rx) = transport::connector();
let unique_connec = UniqueConnec::empty();
let unique_connec2 = unique_connec.clone();
let mut num = 12;
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| {
// Note that this handles both the dial and the listen.
let fut = future::empty().then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() });
num += 1;
unique_connec2.tie_or_passthrough(num, fut)
});
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
let dial_success = unique_connec
.dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx.clone())
.map(|val| { assert_eq!(val, 13); });
swarm_ctrl.dial("/memory".parse().unwrap(), tx)
.unwrap();
let future = dial_success.select(swarm_future.for_each(|_| Ok(()))).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert_eq!(unique_connec.poll(), Some(13));
}
#[test]
fn cleared_when_future_drops() {
// Tests that the `UniqueConnec` gets cleared when the future we associate with it gets
// destroyed.
let (tx, rx) = transport::connector();
let unique_connec = UniqueConnec::empty();
let unique_connec2 = unique_connec.clone();
let (msg_tx, msg_rx) = oneshot::channel();
let mut msg_rx = Some(msg_rx);
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
future::empty()
});
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
let fut = msg_rx.take().unwrap().map_err(|_| -> IoError { unreachable!() });
unique_connec2.tie_or_stop(12, fut)
});
let dial_success = unique_connec
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx)
.map(|val| { assert_eq!(val, 12); })
.inspect({
let c = unique_connec.clone();
move |_| { assert!(c.is_alive()); }
})
.and_then(|_| {
msg_tx.send(()).unwrap();
tokio_timer::sleep(Duration::from_secs(1))
.map_err(|_| unreachable!())
})
.inspect({
let c = unique_connec.clone();
move |_| { assert!(!c.is_alive()); }
});
let future = dial_success
.select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)
.select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert!(!unique_connec.is_alive());
}
#[test]
fn future_drops_when_cleared() {
// Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get cleared.
let (tx, rx) = transport::connector();
let unique_connec = UniqueConnec::empty();
let unique_connec2 = unique_connec.clone();
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
future::empty()
});
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
let finished = Arc::new(atomic::AtomicBool::new(false));
let finished2 = finished.clone();
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
let finished2 = finished2.clone();
unique_connec2.tie_or_stop(12, future::empty()).then(move |v| {
finished2.store(true, atomic::Ordering::Relaxed);
v
})
});
let dial_success = unique_connec
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx)
.map(|val| { assert_eq!(val, 12); })
.inspect({
let c = unique_connec.clone();
move |_| {
assert!(c.is_alive());
c.clear();
assert!(!c.is_alive());
}
})
.and_then(|_| {
tokio_timer::sleep(Duration::from_secs(1))
.map_err(|_| unreachable!())
})
.inspect({
let c = unique_connec.clone();
move |_| {
assert!(finished.load(atomic::Ordering::Relaxed));
assert!(!c.is_alive());
}
});
let future = dial_success
.select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)
.select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
assert!(!unique_connec.is_alive());
}
#[test]
fn future_drops_when_destroyed() {
// Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get dropped.
let (tx, rx) = transport::connector();
let unique_connec = UniqueConnec::empty();
let mut unique_connec2 = Some(unique_connec.clone());
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
future::empty()
});
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
let finished = Arc::new(atomic::AtomicBool::new(false));
let finished2 = finished.clone();
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
let finished2 = finished2.clone();
unique_connec2.take().unwrap().tie_or_stop(12, future::empty()).then(move |v| {
finished2.store(true, atomic::Ordering::Relaxed);
v
})
});
let dial_success = unique_connec
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx)
.map(|val| { assert_eq!(val, 12); })
.inspect(move |_| {
assert!(unique_connec.is_alive());
drop(unique_connec);
})
.and_then(|_| {
tokio_timer::sleep(Duration::from_secs(1))
.map_err(|_| unreachable!())
})
.inspect(move |_| {
assert!(finished.load(atomic::Ordering::Relaxed));
});
let future = dial_success
.select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)
.select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
}
#[test]
fn error_if_unique_connec_destroyed_before_future() {
// Tests that the future returned by `dial` returns an error if the `UniqueConnec` no
// longer exists.
let (tx, rx) = transport::connector();
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| {
future::empty()
});
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
let unique_connec = UniqueConnec::empty();
let dial_success = unique_connec
.dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx)
.then(|val: Result<(), IoError>| {
assert!(val.is_err());
Ok(())
});
drop(unique_connec);
let future = dial_success
.select(swarm_future.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err);
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
}
// TODO: test that dialing is interrupted when UniqueConnec is cleared
// TODO: test that dialing is interrupted when UniqueConnec is dropped
}