mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-18 12:31:22 +00:00
Rework swarm and allow interrupting a dial (#366)
* Rework swarm and allow interrupting a dial * Improve the UniqueConnec situation * Remove UniqueConnec::get * Rename `get_or_dial()` to `dial()` and add `dial_if_empty()` * Clean the UniqueConnec is the future is dropped * Rename `set_until` to `tie_or_stop` and add `tie_or_passthrough` * Add some tests, docs * Fix memory leak with tasks registration * Interrupt dialing when a UniqueConnec is dropped or cleared
This commit is contained in:
committed by
Benjamin Kampmann
parent
e2618dc1b3
commit
ea881e3dfa
@ -25,5 +25,7 @@ libp2p-ping = { path = "../ping" }
|
|||||||
libp2p-tcp-transport = { path = "../tcp-transport" }
|
libp2p-tcp-transport = { path = "../tcp-transport" }
|
||||||
libp2p-mplex = { path = "../mplex" }
|
libp2p-mplex = { path = "../mplex" }
|
||||||
rand = "0.5"
|
rand = "0.5"
|
||||||
|
tokio = "0.1"
|
||||||
tokio-codec = "0.1"
|
tokio-codec = "0.1"
|
||||||
tokio-current-thread = "0.1"
|
tokio-current-thread = "0.1"
|
||||||
|
tokio-timer = "0.2"
|
||||||
|
@ -227,9 +227,13 @@ extern crate tokio_io;
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
extern crate rand;
|
extern crate rand;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
extern crate tokio;
|
||||||
|
#[cfg(test)]
|
||||||
extern crate tokio_codec;
|
extern crate tokio_codec;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
extern crate tokio_current_thread;
|
extern crate tokio_current_thread;
|
||||||
|
#[cfg(test)]
|
||||||
|
extern crate tokio_timer;
|
||||||
|
|
||||||
/// Multi-address re-export.
|
/// Multi-address re-export.
|
||||||
pub extern crate multiaddr;
|
pub extern crate multiaddr;
|
||||||
|
@ -18,11 +18,14 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use futures::stream::{FuturesUnordered, StreamFuture};
|
use futures::stream::StreamFuture;
|
||||||
use futures::sync::{mpsc, oneshot};
|
use futures::sync::oneshot;
|
||||||
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
|
use futures::{Async, Future, IntoFuture, Poll, Stream};
|
||||||
|
use futures::task;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
|
use std::sync::Arc;
|
||||||
use {Multiaddr, MuxedTransport, Transport};
|
use {Multiaddr, MuxedTransport, Transport};
|
||||||
|
|
||||||
/// Creates a swarm.
|
/// Creates a swarm.
|
||||||
@ -32,38 +35,33 @@ use {Multiaddr, MuxedTransport, Transport};
|
|||||||
///
|
///
|
||||||
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
|
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
|
||||||
/// control, and the `Future` must be driven to completion in order for things to work.
|
/// control, and the `Future` must be driven to completion in order for things to work.
|
||||||
///
|
|
||||||
pub fn swarm<T, H, F>(
|
pub fn swarm<T, H, F>(
|
||||||
transport: T,
|
transport: T,
|
||||||
handler: H,
|
handler: H,
|
||||||
) -> (SwarmController<T>, SwarmFuture<T, H, F::Future>)
|
) -> (SwarmController<T>, SwarmFuture<T, H>)
|
||||||
where
|
where
|
||||||
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
||||||
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> F,
|
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> F,
|
||||||
F: IntoFuture<Item = (), Error = IoError>,
|
F: IntoFuture<Item = (), Error = IoError>,
|
||||||
{
|
{
|
||||||
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
|
let shared = Arc::new(Mutex::new(Shared {
|
||||||
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
|
next_incoming: transport.clone().next_incoming(),
|
||||||
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();
|
listeners: Vec::new(),
|
||||||
|
listeners_upgrade: Vec::new(),
|
||||||
|
dialers: Vec::new(),
|
||||||
|
to_process: Vec::new(),
|
||||||
|
task_to_notify: None,
|
||||||
|
}));
|
||||||
|
|
||||||
let future = SwarmFuture {
|
let future = SwarmFuture {
|
||||||
transport: transport.clone(),
|
transport: transport.clone(),
|
||||||
|
shared: shared.clone(),
|
||||||
handler: handler,
|
handler: handler,
|
||||||
new_listeners: new_listeners_rx,
|
|
||||||
next_incoming: transport.clone().next_incoming(),
|
|
||||||
listeners: FuturesUnordered::new(),
|
|
||||||
listeners_upgrade: FuturesUnordered::new(),
|
|
||||||
dialers: FuturesUnordered::new(),
|
|
||||||
new_dialers: new_dialers_rx,
|
|
||||||
to_process: FuturesUnordered::new(),
|
|
||||||
new_toprocess: new_toprocess_rx,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let controller = SwarmController {
|
let controller = SwarmController {
|
||||||
transport: transport,
|
transport,
|
||||||
new_listeners: new_listeners_tx,
|
shared,
|
||||||
new_dialers: new_dialers_tx,
|
|
||||||
new_toprocess: new_toprocess_tx,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
(controller, future)
|
(controller, future)
|
||||||
@ -74,10 +72,11 @@ pub struct SwarmController<T>
|
|||||||
where
|
where
|
||||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||||
{
|
{
|
||||||
|
/// Shared between the swarm infrastructure.
|
||||||
|
shared: Arc<Mutex<Shared<T>>>,
|
||||||
|
|
||||||
|
/// Transport used to dial or listen.
|
||||||
transport: T,
|
transport: T,
|
||||||
new_listeners: mpsc::UnboundedSender<T::Listener>,
|
|
||||||
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
|
|
||||||
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> fmt::Debug for SwarmController<T>
|
impl<T> fmt::Debug for SwarmController<T>
|
||||||
@ -98,9 +97,7 @@ where
|
|||||||
fn clone(&self) -> SwarmController<T> {
|
fn clone(&self) -> SwarmController<T> {
|
||||||
SwarmController {
|
SwarmController {
|
||||||
transport: self.transport.clone(),
|
transport: self.transport.clone(),
|
||||||
new_listeners: self.new_listeners.clone(),
|
shared: self.shared.clone(),
|
||||||
new_dialers: self.new_dialers.clone(),
|
|
||||||
new_toprocess: self.new_toprocess.clone(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -116,33 +113,62 @@ where
|
|||||||
/// Returns a future that is signalled once the closure in the `swarm` has returned its future.
|
/// Returns a future that is signalled once the closure in the `swarm` has returned its future.
|
||||||
/// Therefore if the closure in the swarm has some side effect (eg. write something in a
|
/// Therefore if the closure in the swarm has some side effect (eg. write something in a
|
||||||
/// variable), this side effect will be observable when this future succeeds.
|
/// variable), this side effect will be observable when this future succeeds.
|
||||||
|
#[inline]
|
||||||
pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du)
|
pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du)
|
||||||
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
|
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
|
||||||
where
|
where
|
||||||
Du: Transport + 'static, // TODO: 'static :-/
|
Du: Transport + 'static, // TODO: 'static :-/
|
||||||
Du::Output: Into<T::Output>,
|
Du::Output: Into<T::Output>,
|
||||||
|
{
|
||||||
|
self.dial_then(multiaddr, transport, |v| v)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal version of `dial` that allows adding a closure that is called after either the
|
||||||
|
/// dialing fails or the handler has been called with the resulting future.
|
||||||
|
///
|
||||||
|
/// The returned future is filled with the output of `then`.
|
||||||
|
pub(crate) fn dial_then<Du, F>(&self, multiaddr: Multiaddr, transport: Du, then: F)
|
||||||
|
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
|
||||||
|
where
|
||||||
|
Du: Transport + 'static, // TODO: 'static :-/
|
||||||
|
Du::Output: Into<T::Output>,
|
||||||
|
F: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static,
|
||||||
{
|
{
|
||||||
trace!("Swarm dialing {}", multiaddr);
|
trace!("Swarm dialing {}", multiaddr);
|
||||||
|
|
||||||
match transport.dial(multiaddr.clone()) {
|
match transport.dial(multiaddr.clone()) {
|
||||||
Ok(dial) => {
|
Ok(dial) => {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let mut then = Some(move |val| {
|
||||||
|
let _ = tx.send(then(val));
|
||||||
|
});
|
||||||
|
// Unfortunately the `Box<FnOnce(_)>` type is still unusable in Rust right now,
|
||||||
|
// so we use a `Box<FnMut(_)>` instead and panic if it is called multiple times.
|
||||||
|
let mut then = Box::new(move |val: Result<(), IoError>| {
|
||||||
|
let then = then.take().expect("The Boxed FnMut should only be called once");
|
||||||
|
then(val);
|
||||||
|
}) as Box<FnMut(_)>;
|
||||||
|
|
||||||
let dial = dial.then(|result| {
|
let dial = dial.then(|result| {
|
||||||
match result {
|
match result {
|
||||||
Ok((output, client_addr)) => {
|
Ok((output, client_addr)) => {
|
||||||
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _>>;
|
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _>>;
|
||||||
Ok((output.into(), tx, client_addr))
|
Ok((output.into(), then, client_addr))
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Error in dialer upgrade: {:?}", err);
|
debug!("Error in dialer upgrade: {:?}", err);
|
||||||
let _ = tx.send(Err(err));
|
then(Err(err));
|
||||||
Err(())
|
Err(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// Ignoring errors if the receiver has been closed, because in that situation
|
|
||||||
// nothing is going to be processed anyway.
|
let mut shared = self.shared.lock();
|
||||||
let _ = self.new_dialers.unbounded_send(Box::new(dial) as Box<_>);
|
shared.dialers.push((multiaddr, Box::new(dial) as Box<_>));
|
||||||
|
if let Some(task) = shared.task_to_notify.take() {
|
||||||
|
task.notify();
|
||||||
|
}
|
||||||
|
|
||||||
Ok(rx.then(|result| {
|
Ok(rx.then(|result| {
|
||||||
match result {
|
match result {
|
||||||
Ok(Ok(())) => Ok(()),
|
Ok(Ok(())) => Ok(()),
|
||||||
@ -156,15 +182,38 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Interrupts all dialing attempts to a specific multiaddress.
|
||||||
|
///
|
||||||
|
/// Has no effect if the dialing attempt has already succeeded, in which case it will be
|
||||||
|
/// dispatched to the handler.
|
||||||
|
pub fn interrupt_dial(&self, multiaddr: &Multiaddr) {
|
||||||
|
let mut shared = self.shared.lock();
|
||||||
|
shared.dialers.retain(|dialer| {
|
||||||
|
&dialer.0 != multiaddr
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
|
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
|
||||||
/// was passed to `swarm`.
|
/// was passed to `swarm`.
|
||||||
|
// TODO: add a way to cancel a listener
|
||||||
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
|
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
|
||||||
match self.transport.clone().listen_on(multiaddr) {
|
match self.transport.clone().listen_on(multiaddr) {
|
||||||
Ok((listener, new_addr)) => {
|
Ok((listener, new_addr)) => {
|
||||||
trace!("Swarm listening on {}", new_addr);
|
trace!("Swarm listening on {}", new_addr);
|
||||||
// Ignoring errors if the receiver has been closed, because in that situation
|
let mut shared = self.shared.lock();
|
||||||
// nothing is going to be processed anyway.
|
let listener = Box::new(
|
||||||
let _ = self.new_listeners.unbounded_send(listener);
|
listener.map(|f| {
|
||||||
|
let f = f.map(|(out, maf)| {
|
||||||
|
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(f) as Box<Future<Item = _, Error = _>>
|
||||||
|
}),
|
||||||
|
) as Box<Stream<Item = _, Error = _>>;
|
||||||
|
shared.listeners.push(listener.into_future());
|
||||||
|
if let Some(task) = shared.task_to_notify.take() {
|
||||||
|
task.notify();
|
||||||
|
}
|
||||||
Ok(new_addr)
|
Ok(new_addr)
|
||||||
}
|
}
|
||||||
Err((_, multiaddr)) => Err(multiaddr),
|
Err((_, multiaddr)) => Err(multiaddr),
|
||||||
@ -173,15 +222,152 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Future that must be driven to completion in order for the swarm to work.
|
/// Future that must be driven to completion in order for the swarm to work.
|
||||||
pub struct SwarmFuture<T, H, F>
|
pub struct SwarmFuture<T, H>
|
||||||
where
|
where
|
||||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||||
{
|
{
|
||||||
|
/// Shared between the swarm infrastructure.
|
||||||
|
shared: Arc<Mutex<Shared<T>>>,
|
||||||
|
|
||||||
|
/// The transport used to dial.
|
||||||
transport: T,
|
transport: T,
|
||||||
|
|
||||||
|
/// Swarm handler.
|
||||||
handler: H,
|
handler: H,
|
||||||
new_listeners: mpsc::UnboundedReceiver<T::Listener>,
|
}
|
||||||
|
|
||||||
|
impl<T, H, If, F> Future for SwarmFuture<T, H>
|
||||||
|
where
|
||||||
|
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
|
||||||
|
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> If,
|
||||||
|
If: IntoFuture<Future = F, Item = (), Error = IoError>,
|
||||||
|
F: Future<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
|
||||||
|
{
|
||||||
|
type Item = ();
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
let mut shared = self.shared.lock();
|
||||||
|
let handler = &mut self.handler;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match shared.next_incoming.poll() {
|
||||||
|
Ok(Async::Ready(connec)) => {
|
||||||
|
debug!("Swarm received new multiplexed incoming connection");
|
||||||
|
shared.next_incoming = self.transport.clone().next_incoming();
|
||||||
|
let connec = connec.map(|(out, maf)| {
|
||||||
|
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
|
||||||
|
});
|
||||||
|
shared.listeners_upgrade.push(Box::new(connec) as Box<_>);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => break,
|
||||||
|
Err(err) => {
|
||||||
|
// TODO: should that stop everything?
|
||||||
|
debug!("Error in multiplexed incoming connection: {:?}", err);
|
||||||
|
shared.next_incoming = self.transport.clone().next_incoming();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We remove each element from `shared.listeners` one by one and add them back only
|
||||||
|
// if relevant.
|
||||||
|
for n in (0 .. shared.listeners.len()).rev() {
|
||||||
|
let mut listener = shared.listeners.swap_remove(n);
|
||||||
|
loop {
|
||||||
|
match listener.poll() {
|
||||||
|
Ok(Async::Ready((Some(upgrade), remaining))) => {
|
||||||
|
trace!("Swarm received new connection on listener socket");
|
||||||
|
shared.listeners_upgrade.push(upgrade);
|
||||||
|
listener = remaining.into_future();
|
||||||
|
}
|
||||||
|
Ok(Async::Ready((None, _))) => {
|
||||||
|
debug!("Listener closed gracefully");
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
Err((err, _)) => {
|
||||||
|
debug!("Error in listener: {:?}", err);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
shared.listeners.push(listener);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We remove each element from `shared.listeners_upgrade` one by one and add them back
|
||||||
|
// only if relevant.
|
||||||
|
for n in (0 .. shared.listeners_upgrade.len()).rev() {
|
||||||
|
let mut listener_upgrade = shared.listeners_upgrade.swap_remove(n);
|
||||||
|
match listener_upgrade.poll() {
|
||||||
|
Ok(Async::Ready((output, client_addr))) => {
|
||||||
|
debug!("Successfully upgraded incoming connection");
|
||||||
|
// TODO: unlock mutex before calling handler, in order to avoid deadlocks if
|
||||||
|
// the user does something stupid
|
||||||
|
shared.to_process.push(Box::new(handler(output, client_addr).into_future()));
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
debug!("Error in listener upgrade: {:?}", err);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
shared.listeners_upgrade.push(listener_upgrade);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We remove each element from `shared.dialers` one by one and add them back only
|
||||||
|
// if relevant.
|
||||||
|
for n in (0 .. shared.dialers.len()).rev() {
|
||||||
|
let (client_addr, mut dialer) = shared.dialers.swap_remove(n);
|
||||||
|
match dialer.poll() {
|
||||||
|
Ok(Async::Ready((output, mut notifier, addr))) => {
|
||||||
|
trace!("Successfully upgraded dialed connection");
|
||||||
|
// TODO: unlock mutex before calling handler, in order to avoid deadlocks if
|
||||||
|
// the user does something stupid
|
||||||
|
shared.to_process.push(Box::new(handler(output, addr).into_future()));
|
||||||
|
notifier(Ok(()));
|
||||||
|
}
|
||||||
|
Err(()) => {},
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
shared.dialers.push((client_addr, dialer));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We remove each element from `shared.to_process` one by one and add them back only
|
||||||
|
// if relevant.
|
||||||
|
for n in (0 .. shared.to_process.len()).rev() {
|
||||||
|
let mut to_process = shared.to_process.swap_remove(n);
|
||||||
|
match to_process.poll() {
|
||||||
|
Ok(Async::Ready(())) => {
|
||||||
|
trace!("Future returned by swarm handler driven to completion");
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
debug!("Error in processing: {:?}", err);
|
||||||
|
}
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
shared.to_process.push(to_process);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: we never return `Ok(Ready)` because there's no way to know whether
|
||||||
|
// `next_incoming()` can produce anything more in the future ; also we would need to
|
||||||
|
// know when the controller has been dropped
|
||||||
|
shared.task_to_notify = Some(task::current());
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: stronger typing
|
||||||
|
struct Shared<T> where T: MuxedTransport + 'static {
|
||||||
|
/// Next incoming substream on the transport.
|
||||||
next_incoming: T::Incoming,
|
next_incoming: T::Incoming,
|
||||||
listeners: FuturesUnordered<
|
|
||||||
|
/// All the active listeners.
|
||||||
|
listeners: Vec<
|
||||||
StreamFuture<
|
StreamFuture<
|
||||||
Box<
|
Box<
|
||||||
Stream<
|
Stream<
|
||||||
@ -191,162 +377,33 @@ where
|
|||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
>,
|
>,
|
||||||
|
|
||||||
|
/// Futures that upgrade an incoming listening connection to a full connection.
|
||||||
listeners_upgrade:
|
listeners_upgrade:
|
||||||
FuturesUnordered<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
|
Vec<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
|
||||||
dialers: FuturesUnordered<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
|
|
||||||
new_dialers:
|
|
||||||
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
|
|
||||||
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
|
|
||||||
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, H, If, F> Future for SwarmFuture<T, H, F>
|
/// Futures that dial a remote address.
|
||||||
where
|
///
|
||||||
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
|
/// Contains the address we dial, so that we can cancel it if necessary.
|
||||||
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> If,
|
dialers: Vec<(Multiaddr, Box<Future<Item = (T::Output, Box<FnMut(Result<(), IoError>)>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>)>,
|
||||||
If: IntoFuture<Future = F, Item = (), Error = IoError>,
|
|
||||||
F: Future<Item = (), Error = IoError>,
|
|
||||||
{
|
|
||||||
type Item = ();
|
|
||||||
type Error = IoError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
/// List of futures produced by the swarm closure. Must be processed to the end.
|
||||||
let handler = &mut self.handler;
|
to_process: Vec<Box<Future<Item = (), Error = IoError>>>,
|
||||||
|
|
||||||
loop {
|
/// The task to notify whenever we add a new element in one of the lists.
|
||||||
match self.next_incoming.poll() {
|
/// Necessary so that the task wakes up and the element gets polled.
|
||||||
Ok(Async::Ready(connec)) => {
|
task_to_notify: Option<task::Task>,
|
||||||
debug!("Swarm received new multiplexed incoming connection");
|
|
||||||
self.next_incoming = self.transport.clone().next_incoming();
|
|
||||||
let connec = connec.map(|(out, maf)| {
|
|
||||||
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
|
|
||||||
});
|
|
||||||
self.listeners_upgrade.push(Box::new(connec) as Box<_>);
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
Err(err) => {
|
|
||||||
debug!("Error in multiplexed incoming connection: {:?}", err);
|
|
||||||
self.next_incoming = self.transport.clone().next_incoming();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.new_listeners.poll() {
|
|
||||||
Ok(Async::Ready(Some(new_listener))) => {
|
|
||||||
let new_listener = Box::new(
|
|
||||||
new_listener.map(|f| {
|
|
||||||
let f = f.map(|(out, maf)| {
|
|
||||||
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
|
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(f) as Box<Future<Item = _, Error = _>>
|
|
||||||
}),
|
|
||||||
) as Box<Stream<Item = _, Error = _>>;
|
|
||||||
self.listeners.push(new_listener.into_future());
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) | Err(_) => {
|
|
||||||
// New listener sender has been closed.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.new_dialers.poll() {
|
|
||||||
Ok(Async::Ready(Some(new_dialer))) => {
|
|
||||||
self.dialers.push(new_dialer);
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) | Err(_) => {
|
|
||||||
// New dialers sender has been closed.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.new_toprocess.poll() {
|
|
||||||
Ok(Async::Ready(Some(new_toprocess))) => {
|
|
||||||
self.to_process.push(future::Either::B(new_toprocess));
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(None)) | Err(_) => {
|
|
||||||
// New to-process sender has been closed.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
Ok(Async::NotReady) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.listeners.poll() {
|
|
||||||
Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
|
|
||||||
trace!("Swarm received new connection on listener socket");
|
|
||||||
self.listeners_upgrade.push(upgrade);
|
|
||||||
self.listeners.push(remaining.into_future());
|
|
||||||
}
|
|
||||||
Err((err, _)) => {
|
|
||||||
debug!("Error in listener: {:?}", err);
|
|
||||||
break
|
|
||||||
}
|
|
||||||
_ => break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.listeners_upgrade.poll() {
|
|
||||||
Ok(Async::Ready(Some((output, client_addr)))) => {
|
|
||||||
debug!("Successfully upgraded incoming connection");
|
|
||||||
self.to_process.push(future::Either::A(
|
|
||||||
handler(output, client_addr).into_future(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
debug!("Error in listener upgrade: {:?}", err);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.dialers.poll() {
|
|
||||||
Ok(Async::Ready(Some((output, notifier, addr)))) => {
|
|
||||||
trace!("Successfully upgraded dialed connection");
|
|
||||||
self.to_process
|
|
||||||
.push(future::Either::A(handler(output, addr).into_future()));
|
|
||||||
let _ = notifier.send(Ok(()));
|
|
||||||
}
|
|
||||||
Err(()) => break,
|
|
||||||
_ => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match self.to_process.poll() {
|
|
||||||
Ok(Async::Ready(Some(()))) => {
|
|
||||||
trace!("Future returned by swarm handler driven to completion");
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
debug!("Error in processing: {:?}", err);
|
|
||||||
}
|
|
||||||
_ => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: we never return `Ok(Ready)` because there's no way to know whether
|
|
||||||
// `next_incoming()` can produce anything more in the future
|
|
||||||
Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use futures::future;
|
use futures::{Future, future};
|
||||||
use transport::DeniedTransport;
|
use rand;
|
||||||
|
use transport::{self, DeniedTransport, Transport};
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use std::sync::{atomic, Arc};
|
||||||
use swarm;
|
use swarm;
|
||||||
|
use tokio::runtime::current_thread;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn transport_error_propagation_listen() {
|
fn transport_error_propagation_listen() {
|
||||||
@ -360,4 +417,74 @@ mod tests {
|
|||||||
let addr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap();
|
let addr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap();
|
||||||
assert!(swarm_ctrl.dial(addr, DeniedTransport).is_err());
|
assert!(swarm_ctrl.dial(addr, DeniedTransport).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn basic_dial() {
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
|
||||||
|
let reached_tx = Arc::new(atomic::AtomicBool::new(false));
|
||||||
|
let reached_tx2 = reached_tx.clone();
|
||||||
|
|
||||||
|
let reached_rx = Arc::new(atomic::AtomicBool::new(false));
|
||||||
|
let reached_rx2 = reached_rx.clone();
|
||||||
|
|
||||||
|
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), |_, _| {
|
||||||
|
reached_rx2.store(true, atomic::Ordering::SeqCst);
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), |_, _| {
|
||||||
|
reached_tx2.store(true, atomic::Ordering::SeqCst);
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
|
||||||
|
let dial_success = swarm_ctrl2.dial("/memory".parse().unwrap(), tx).unwrap();
|
||||||
|
let future = swarm_future2
|
||||||
|
.select(swarm_future1).map(|_| ()).map_err(|(err, _)| err)
|
||||||
|
.select(dial_success).map(|_| ()).map_err(|(err, _)| err);
|
||||||
|
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert!(reached_tx.load(atomic::Ordering::SeqCst));
|
||||||
|
assert!(reached_rx.load(atomic::Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dial_multiple_times() {
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let reached = Arc::new(atomic::AtomicUsize::new(0));
|
||||||
|
let reached2 = reached.clone();
|
||||||
|
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| {
|
||||||
|
reached2.fetch_add(1, atomic::Ordering::SeqCst);
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
let num_dials = 20000 + rand::random::<usize>() % 20000;
|
||||||
|
let mut dials = Vec::new();
|
||||||
|
for _ in 0 .. num_dials {
|
||||||
|
let f = swarm_ctrl.dial("/memory".parse().unwrap(), tx.clone()).unwrap();
|
||||||
|
dials.push(f);
|
||||||
|
}
|
||||||
|
let future = future::join_all(dials)
|
||||||
|
.map(|_| ())
|
||||||
|
.select(swarm_future)
|
||||||
|
.map_err(|(err, _)| err);
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert_eq!(reached.load(atomic::Ordering::SeqCst), num_dials);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn future_isnt_dropped() {
|
||||||
|
// Tests that the future in the closure isn't being dropped.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| {
|
||||||
|
future::empty()
|
||||||
|
.then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() }) // <-- the test
|
||||||
|
});
|
||||||
|
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
let dial_success = swarm_ctrl.dial("/memory".parse().unwrap(), tx).unwrap();
|
||||||
|
let future = dial_success.select(swarm_future)
|
||||||
|
.map_err(|(err, _)| err);
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
121
core/src/transport/interruptible.rs
Normal file
121
core/src/transport/interruptible.rs
Normal file
@ -0,0 +1,121 @@
|
|||||||
|
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use futures::{future, prelude::*, sync::oneshot};
|
||||||
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
|
use transport::{MuxedTransport, Transport};
|
||||||
|
use Multiaddr;
|
||||||
|
|
||||||
|
/// See `Transport::interruptible`.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Interruptible<T> {
|
||||||
|
transport: T,
|
||||||
|
rx: future::Shared<oneshot::Receiver<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Interruptible<T> {
|
||||||
|
/// Internal function that builds an `Interruptible`.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn new(transport: T) -> (Interruptible<T>, Interrupt) {
|
||||||
|
let (_tx, rx) = oneshot::channel();
|
||||||
|
let transport = Interruptible { transport, rx: rx.shared() };
|
||||||
|
let int = Interrupt { _tx };
|
||||||
|
(transport, int)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Transport for Interruptible<T>
|
||||||
|
where
|
||||||
|
T: Transport,
|
||||||
|
{
|
||||||
|
type Output = T::Output;
|
||||||
|
type MultiaddrFuture = T::MultiaddrFuture;
|
||||||
|
type Listener = T::Listener;
|
||||||
|
type ListenerUpgrade = T::ListenerUpgrade;
|
||||||
|
type Dial = InterruptibleDial<T::Dial>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
|
match self.transport.listen_on(addr) {
|
||||||
|
Ok(val) => Ok(val),
|
||||||
|
Err((transport, addr)) => Err((Interruptible { transport, rx: self.rx }, addr)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
|
match self.transport.dial(addr) {
|
||||||
|
Ok(future) => {
|
||||||
|
Ok(InterruptibleDial {
|
||||||
|
inner: future,
|
||||||
|
rx: self.rx,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Err((transport, addr)) => Err((Interruptible { transport, rx: self.rx }, addr)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||||
|
self.transport.nat_traversal(server, observed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> MuxedTransport for Interruptible<T>
|
||||||
|
where
|
||||||
|
T: MuxedTransport,
|
||||||
|
{
|
||||||
|
type Incoming = T::Incoming;
|
||||||
|
type IncomingUpgrade = T::IncomingUpgrade;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn next_incoming(self) -> Self::Incoming {
|
||||||
|
self.transport.next_incoming()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dropping this object interrupts the dialing of the corresponding `Interruptible`.
|
||||||
|
pub struct Interrupt {
|
||||||
|
_tx: oneshot::Sender<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InterruptibleDial<F> {
|
||||||
|
inner: F,
|
||||||
|
rx: future::Shared<oneshot::Receiver<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F> Future for InterruptibleDial<F>
|
||||||
|
where F: Future<Error = IoError>
|
||||||
|
{
|
||||||
|
type Item = F::Item;
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self.rx.poll() {
|
||||||
|
Ok(Async::Ready(_)) | Err(_) => {
|
||||||
|
return Err(IoError::new(IoErrorKind::ConnectionAborted, "connection interrupted"));
|
||||||
|
},
|
||||||
|
Ok(Async::NotReady) => (),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.inner.poll()
|
||||||
|
}
|
||||||
|
}
|
@ -39,6 +39,7 @@ pub mod and_then;
|
|||||||
pub mod choice;
|
pub mod choice;
|
||||||
pub mod denied;
|
pub mod denied;
|
||||||
pub mod dummy;
|
pub mod dummy;
|
||||||
|
pub mod interruptible;
|
||||||
pub mod map;
|
pub mod map;
|
||||||
pub mod map_err;
|
pub mod map_err;
|
||||||
pub mod memory;
|
pub mod memory;
|
||||||
@ -192,4 +193,13 @@ pub trait Transport {
|
|||||||
{
|
{
|
||||||
DummyMuxing::new(self)
|
DummyMuxing::new(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wraps around the `Transport` and makes it interruptible.
|
||||||
|
#[inline]
|
||||||
|
fn interruptible(self) -> (interruptible::Interruptible<Self>, interruptible::Interrupt)
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
interruptible::Interruptible::new(self)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,14 @@
|
|||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use fnv::FnvHashMap;
|
||||||
use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture};
|
use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use {Multiaddr, MuxedTransport, SwarmController, Transport};
|
use {Multiaddr, MuxedTransport, SwarmController, Transport};
|
||||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak, atomic::AtomicUsize, atomic::Ordering};
|
||||||
|
use transport::interruptible::Interrupt;
|
||||||
|
|
||||||
/// Storage for a unique connection with a remote.
|
/// Storage for a unique connection with a remote.
|
||||||
pub struct UniqueConnec<T> {
|
pub struct UniqueConnec<T> {
|
||||||
@ -36,10 +38,13 @@ enum UniqueConnecInner<T> {
|
|||||||
/// We started dialing, but no response has been obtained so far.
|
/// We started dialing, but no response has been obtained so far.
|
||||||
Pending {
|
Pending {
|
||||||
/// Tasks that need to be awakened when the content of this object is set.
|
/// Tasks that need to be awakened when the content of this object is set.
|
||||||
tasks_waiting: Vec<task::Task>,
|
tasks_waiting: FnvHashMap<usize, task::Task>,
|
||||||
/// Future that represents when `set_until` should have been called.
|
/// Future that represents when `tie_*` should have been called.
|
||||||
// TODO: Send + Sync bound is meh
|
// TODO: Send + Sync bound is meh
|
||||||
dial_fut: Box<Future<Item = (), Error = IoError> + Send + Sync>,
|
dial_fut: Box<Future<Item = (), Error = IoError> + Send + Sync>,
|
||||||
|
/// Dropping this object will automatically interrupt the dial, which is very useful if
|
||||||
|
/// we clear or drop the `UniqueConnec`.
|
||||||
|
interrupt: Interrupt,
|
||||||
},
|
},
|
||||||
/// The value of this unique connec has been set.
|
/// The value of this unique connec has been set.
|
||||||
/// Can only transition to `Empty` when the future has expired.
|
/// Can only transition to `Empty` when the future has expired.
|
||||||
@ -85,69 +90,127 @@ impl<T> UniqueConnec<T> {
|
|||||||
|
|
||||||
/// Loads the value from the object.
|
/// Loads the value from the object.
|
||||||
///
|
///
|
||||||
/// If the object is empty, dials the given multiaddress with the given transport.
|
/// If the object is empty or has errored earlier, dials the given multiaddress with the
|
||||||
|
/// given transport.
|
||||||
///
|
///
|
||||||
/// The closure of the `swarm` is expected to call `set_until()` on the `UniqueConnec`. Failure
|
/// The closure of the `swarm` is expected to call `tie_*()` on the `UniqueConnec`. Failure
|
||||||
/// to do so will make the `UniqueConnecFuture` produce an error.
|
/// to do so will make the `UniqueConnecFuture` produce an error.
|
||||||
pub fn get_or_dial<S, Du>(&self, swarm: &SwarmController<S>, multiaddr: &Multiaddr,
|
///
|
||||||
|
/// One critical property of this method, is that if a connection incomes and `tie_*` is
|
||||||
|
/// called, then it will be returned by the returned future.
|
||||||
|
#[inline]
|
||||||
|
pub fn dial<S, Du>(&self, swarm: &SwarmController<S>, multiaddr: &Multiaddr,
|
||||||
transport: Du) -> UniqueConnecFuture<T>
|
transport: Du) -> UniqueConnecFuture<T>
|
||||||
where T: Clone,
|
where T: Clone + 'static, // TODO: 'static :-/
|
||||||
Du: Transport + 'static, // TODO: 'static :-/
|
Du: Transport + 'static, // TODO: 'static :-/
|
||||||
Du::Output: Into<S::Output>,
|
Du::Output: Into<S::Output>,
|
||||||
S: Clone + MuxedTransport,
|
S: Clone + MuxedTransport,
|
||||||
{
|
{
|
||||||
self.get(|| {
|
self.dial_inner(swarm, multiaddr, transport, true)
|
||||||
swarm.dial(multiaddr.clone(), transport)
|
|
||||||
.map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported"))
|
|
||||||
.into_future()
|
|
||||||
.flatten()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Loads the value from the object.
|
/// Same as `dial`, except that the future will produce an error if an earlier attempt to dial
|
||||||
///
|
/// has errored.
|
||||||
/// If the object is empty, calls the closure. The closure should return a future that
|
#[inline]
|
||||||
/// should be signaled after `set_until` has been called. If the future produces an error,
|
pub fn dial_if_empty<S, Du>(&self, swarm: &SwarmController<S>, multiaddr: &Multiaddr,
|
||||||
/// then the object will empty itself again and the `UniqueConnecFuture` will return an error.
|
transport: Du) -> UniqueConnecFuture<T>
|
||||||
/// If the future is finished and `set_until` hasn't been called, then the `UniqueConnecFuture`
|
where T: Clone + 'static, // TODO: 'static :-/
|
||||||
/// will return an error.
|
Du: Transport + 'static, // TODO: 'static :-/
|
||||||
pub fn get<F, Fut>(&self, or: F) -> UniqueConnecFuture<T>
|
Du::Output: Into<S::Output>,
|
||||||
where F: FnOnce() -> Fut,
|
S: Clone + MuxedTransport,
|
||||||
T: Clone,
|
|
||||||
Fut: IntoFuture<Item = (), Error = IoError>,
|
|
||||||
Fut::Future: Send + Sync + 'static, // TODO: 'static :-/
|
|
||||||
{
|
{
|
||||||
match &*self.inner.lock() {
|
self.dial_inner(swarm, multiaddr, transport, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inner implementation of `dial_*`.
|
||||||
|
fn dial_inner<S, Du>(&self, swarm: &SwarmController<S>, multiaddr: &Multiaddr,
|
||||||
|
transport: Du, dial_if_err: bool) -> UniqueConnecFuture<T>
|
||||||
|
where T: Clone + 'static, // TODO: 'static :-/
|
||||||
|
Du: Transport + 'static, // TODO: 'static :-/
|
||||||
|
Du::Output: Into<S::Output>,
|
||||||
|
S: Clone + MuxedTransport,
|
||||||
|
{
|
||||||
|
let mut inner = self.inner.lock();
|
||||||
|
match &*inner {
|
||||||
UniqueConnecInner::Empty => (),
|
UniqueConnecInner::Empty => (),
|
||||||
|
UniqueConnecInner::Errored(_) if dial_if_err => (),
|
||||||
_ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) },
|
_ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) },
|
||||||
};
|
};
|
||||||
|
|
||||||
// The mutex is unlocked when we call `or`, in order to avoid potential deadlocks.
|
let weak_inner = Arc::downgrade(&self.inner);
|
||||||
let dial_fut = or().into_future();
|
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
let (transport, interrupt) = transport.interruptible();
|
||||||
// Since we unlocked the mutex, it's possible that the object was filled in the meanwhile.
|
let dial_fut = swarm.dial_then(multiaddr.clone(), transport,
|
||||||
// Therefore we check again whether it's still `Empty`.
|
move |val: Result<(), IoError>| {
|
||||||
if let UniqueConnecInner::Empty = &mut *inner {
|
let inner = match weak_inner.upgrade() {
|
||||||
*inner = UniqueConnecInner::Pending {
|
Some(i) => i,
|
||||||
tasks_waiting: Vec::new(),
|
None => return val
|
||||||
dial_fut: Box::new(dial_fut),
|
};
|
||||||
};
|
|
||||||
}
|
let mut inner = inner.lock();
|
||||||
|
if let UniqueConnecInner::Full { .. } = *inner {
|
||||||
|
return val;
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_val = UniqueConnecInner::Errored(match val {
|
||||||
|
Ok(()) => IoError::new(IoErrorKind::ConnectionRefused,
|
||||||
|
"dialing has succeeded but tie_* hasn't been called"),
|
||||||
|
Err(ref err) => IoError::new(err.kind(), err.to_string()),
|
||||||
|
});
|
||||||
|
|
||||||
|
match mem::replace(&mut *inner, new_val) {
|
||||||
|
UniqueConnecInner::Pending { tasks_waiting, .. } => {
|
||||||
|
for task in tasks_waiting {
|
||||||
|
task.1.notify();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => ()
|
||||||
|
};
|
||||||
|
|
||||||
|
val
|
||||||
|
});
|
||||||
|
|
||||||
|
let dial_fut = dial_fut
|
||||||
|
.map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported"))
|
||||||
|
.into_future()
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
*inner = UniqueConnecInner::Pending {
|
||||||
|
tasks_waiting: Default::default(),
|
||||||
|
dial_fut: Box::new(dial_fut),
|
||||||
|
interrupt,
|
||||||
|
};
|
||||||
|
|
||||||
UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }
|
UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Puts `value` inside the object. The second parameter is a future whose completion will
|
/// Puts `value` inside the object.
|
||||||
/// clear up the content. Returns an adjusted version of that same future.
|
/// Additionally, the `UniqueConnec` will be tied to the `until` future. When the future drops
|
||||||
|
/// or finishes, the `UniqueConnec` is automatically cleared. If the `UniqueConnec` is cleared
|
||||||
|
/// by the user, the future automatically stops.
|
||||||
|
/// The returned future is an adjusted version of that same future.
|
||||||
///
|
///
|
||||||
/// If `clear()` is called, the returned future will automatically complete with an error.
|
/// If the object already contains something, then `until` is dropped and a dummy future that
|
||||||
///
|
/// immediately ends is returned.
|
||||||
/// Has no effect if the object already contains something.
|
pub fn tie_or_stop<F>(&self, value: T, until: F) -> impl Future<Item = (), Error = F::Error>
|
||||||
pub fn set_until<F>(&self, value: T, until: F) -> impl Future<Item = (), Error = F::Error>
|
|
||||||
where F: Future<Item = ()>
|
where F: Future<Item = ()>
|
||||||
{
|
{
|
||||||
let mut tasks_to_notify = Vec::new();
|
self.tie_inner(value, until, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as `tie_or_stop`, except that is if the object already contains something, then
|
||||||
|
/// `until` is returned immediately and can live in parallel.
|
||||||
|
pub fn tie_or_passthrough<F>(&self, value: T, until: F) -> impl Future<Item = (), Error = F::Error>
|
||||||
|
where F: Future<Item = ()>
|
||||||
|
{
|
||||||
|
self.tie_inner(value, until, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inner implementation of `tie_*`.
|
||||||
|
fn tie_inner<F>(&self, value: T, until: F, pass_through: bool) -> impl Future<Item = (), Error = F::Error>
|
||||||
|
where F: Future<Item = ()>
|
||||||
|
{
|
||||||
|
let mut tasks_to_notify = Default::default();
|
||||||
|
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
let (on_clear, on_clear_rx) = oneshot::channel();
|
let (on_clear, on_clear_rx) = oneshot::channel();
|
||||||
@ -160,23 +223,37 @@ impl<T> UniqueConnec<T> {
|
|||||||
old @ UniqueConnecInner::Full { .. } => {
|
old @ UniqueConnecInner::Full { .. } => {
|
||||||
// Keep the old value.
|
// Keep the old value.
|
||||||
*inner = old;
|
*inner = old;
|
||||||
return future::Either::B(until);
|
if pass_through {
|
||||||
|
return future::Either::B(future::Either::A(until));
|
||||||
|
} else {
|
||||||
|
return future::Either::B(future::Either::B(future::ok(())));
|
||||||
|
}
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
drop(inner);
|
drop(inner);
|
||||||
|
|
||||||
|
struct Cleaner<T>(Weak<Mutex<UniqueConnecInner<T>>>);
|
||||||
|
impl<T> Drop for Cleaner<T> {
|
||||||
|
#[inline]
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(inner) = self.0.upgrade() {
|
||||||
|
*inner.lock() = UniqueConnecInner::Empty;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let cleaner = Cleaner(Arc::downgrade(&self.inner));
|
||||||
|
|
||||||
// The mutex is unlocked when we notify the pending tasks.
|
// The mutex is unlocked when we notify the pending tasks.
|
||||||
for task in tasks_to_notify {
|
for task in tasks_to_notify {
|
||||||
task.notify();
|
task.1.notify();
|
||||||
}
|
}
|
||||||
|
|
||||||
let inner = self.inner.clone();
|
|
||||||
let fut = until
|
let fut = until
|
||||||
.select(on_clear_rx.then(|_| Ok(())))
|
.select(on_clear_rx.then(|_| Ok(())))
|
||||||
.map(|((), _)| ())
|
.map(|((), _)| ())
|
||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.then(move |val| {
|
.then(move |val| {
|
||||||
*inner.lock() = UniqueConnecInner::Empty;
|
drop(cleaner); // Make sure that `cleaner` gets called there.
|
||||||
val
|
val
|
||||||
});
|
});
|
||||||
future::Either::A(fut)
|
future::Either::A(fut)
|
||||||
@ -185,7 +262,7 @@ impl<T> UniqueConnec<T> {
|
|||||||
/// Clears the content of the object.
|
/// Clears the content of the object.
|
||||||
///
|
///
|
||||||
/// Has no effect if the content is empty or pending.
|
/// Has no effect if the content is empty or pending.
|
||||||
/// If the node was full, calling `clear` will stop the future returned by `set_until`.
|
/// If the node was full, calling `clear` will stop the future returned by `tie_*`.
|
||||||
pub fn clear(&self) {
|
pub fn clear(&self) {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
|
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
|
||||||
@ -195,6 +272,12 @@ impl<T> UniqueConnec<T> {
|
|||||||
*inner = pending;
|
*inner = pending;
|
||||||
},
|
},
|
||||||
UniqueConnecInner::Full { on_clear, .. } => {
|
UniqueConnecInner::Full { on_clear, .. } => {
|
||||||
|
// TODO: Should we really replace the `Full` with an `Empty` here? What about
|
||||||
|
// letting dropping the future clear the connection automatically? Otherwise
|
||||||
|
// it is possible that the user dials before the future gets dropped, in which
|
||||||
|
// case the future dropping will set the value to `Empty`. But on the other hand,
|
||||||
|
// it is expected that `clear()` is instantaneous and if it is followed with
|
||||||
|
// `dial()` then it should dial.
|
||||||
let _ = on_clear.send(());
|
let _ = on_clear.send(());
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@ -242,7 +325,23 @@ impl<T> Default for UniqueConnec<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Future returned by `UniqueConnec::get()`.
|
impl<T> Drop for UniqueConnec<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// Notify the waiting futures if we are the last `UniqueConnec`.
|
||||||
|
if let Some(inner) = Arc::get_mut(&mut self.inner) {
|
||||||
|
match *inner.get_mut() {
|
||||||
|
UniqueConnecInner::Pending { ref mut tasks_waiting, .. } => {
|
||||||
|
for task in tasks_waiting.drain() {
|
||||||
|
task.1.notify();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Future returned by `UniqueConnec::dial()`.
|
||||||
pub struct UniqueConnecFuture<T> {
|
pub struct UniqueConnecFuture<T> {
|
||||||
inner: Weak<Mutex<UniqueConnecInner<T>>>,
|
inner: Weak<Mutex<UniqueConnecInner<T>>>,
|
||||||
}
|
}
|
||||||
@ -263,28 +362,32 @@ impl<T> Future for UniqueConnecFuture<T>
|
|||||||
let mut inner = inner.lock();
|
let mut inner = inner.lock();
|
||||||
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
|
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
|
||||||
UniqueConnecInner::Empty => {
|
UniqueConnecInner::Empty => {
|
||||||
// This can happen if `set_until()` is called, and the future expires before the
|
// This can happen if `tie_*()` is called, and the future expires before the
|
||||||
// future returned by `get()` gets polled. This means that the connection has been
|
// future returned by `dial()` gets polled. This means that the connection has been
|
||||||
// closed.
|
// closed.
|
||||||
Err(IoErrorKind::ConnectionAborted.into())
|
Err(IoErrorKind::ConnectionAborted.into())
|
||||||
},
|
},
|
||||||
UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut } => {
|
UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut, interrupt } => {
|
||||||
match dial_fut.poll() {
|
match dial_fut.poll() {
|
||||||
Ok(Async::Ready(())) => {
|
Ok(Async::Ready(())) => {
|
||||||
// This happens if we successfully dialed a remote, but the callback
|
// This happens if we successfully dialed a remote, but the callback
|
||||||
// doesn't call `set_until`. This can be a logic error by the user,
|
// doesn't call `tie_*`. This can be a logic error by the user,
|
||||||
// but could also indicate that the user decided to filter out this
|
// but could also indicate that the user decided to filter out this
|
||||||
// connection for whatever reason.
|
// connection for whatever reason.
|
||||||
*inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into());
|
*inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into());
|
||||||
Err(IoErrorKind::ConnectionAborted.into())
|
Err(IoErrorKind::ConnectionAborted.into())
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
tasks_waiting.push(task::current());
|
static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0);
|
||||||
*inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut };
|
task_local! {
|
||||||
|
static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
tasks_waiting.insert(TASK_ID.with(|&k| k), task::current());
|
||||||
|
*inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut, interrupt };
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string());
|
let tr = IoError::new(err.kind(), err.to_string());
|
||||||
*inner = UniqueConnecInner::Errored(err);
|
*inner = UniqueConnecInner::Errored(err);
|
||||||
Err(tr)
|
Err(tr)
|
||||||
},
|
},
|
||||||
@ -298,7 +401,7 @@ impl<T> Future for UniqueConnecFuture<T>
|
|||||||
Ok(Async::Ready(value))
|
Ok(Async::Ready(value))
|
||||||
},
|
},
|
||||||
UniqueConnecInner::Errored(err) => {
|
UniqueConnecInner::Errored(err) => {
|
||||||
let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string());
|
let tr = IoError::new(err.kind(), err.to_string());
|
||||||
*inner = UniqueConnecInner::Errored(err);
|
*inner = UniqueConnecInner::Errored(err);
|
||||||
Err(tr)
|
Err(tr)
|
||||||
},
|
},
|
||||||
@ -311,35 +414,321 @@ impl<T> Future for UniqueConnecFuture<T>
|
|||||||
pub enum UniqueConnecState {
|
pub enum UniqueConnecState {
|
||||||
/// The object is empty.
|
/// The object is empty.
|
||||||
Empty,
|
Empty,
|
||||||
/// `get_*` has been called and we are waiting for `set_until` to be called.
|
/// `dial` has been called and we are waiting for `tie_*` to be called.
|
||||||
Pending,
|
Pending,
|
||||||
/// `set_until` has been called.
|
/// `tie_*` has been called.
|
||||||
Full,
|
Full,
|
||||||
/// The future returned by the closure of `get_*` has errored or has finished before
|
/// The future returned by the closure of `dial` has errored or has finished before
|
||||||
/// `set_until` has been called.
|
/// `tie_*` has been called.
|
||||||
Errored,
|
Errored,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use futures::{future, Future};
|
use futures::{future, sync::oneshot, Future};
|
||||||
use transport::DeniedTransport;
|
use transport::DeniedTransport;
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use std::sync::{Arc, atomic};
|
||||||
|
use std::time::Duration;
|
||||||
use {UniqueConnec, UniqueConnecState};
|
use {UniqueConnec, UniqueConnecState};
|
||||||
use swarm;
|
use {swarm, transport, Transport};
|
||||||
|
use tokio::runtime::current_thread;
|
||||||
|
use tokio_timer;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn basic_working() {
|
||||||
|
// Checks the basic working of the `UniqueConnec`.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let unique_connec2 = unique_connec.clone();
|
||||||
|
assert_eq!(unique_connec.state(), UniqueConnecState::Empty);
|
||||||
|
|
||||||
|
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), |_, _| {
|
||||||
|
// Note that this handles both the dial and the listen.
|
||||||
|
assert!(unique_connec2.is_alive());
|
||||||
|
unique_connec2.tie_or_stop(12, future::empty())
|
||||||
|
});
|
||||||
|
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx)
|
||||||
|
.map(|val| { assert_eq!(val, 12); });
|
||||||
|
assert_eq!(unique_connec.state(), UniqueConnecState::Pending);
|
||||||
|
|
||||||
|
let future = dial_success.select(swarm_future).map_err(|(err, _)| err);
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert_eq!(unique_connec.state(), UniqueConnecState::Full);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn invalid_multiaddr_produces_error() {
|
fn invalid_multiaddr_produces_error() {
|
||||||
|
// Tests that passing an invalid multiaddress generates an error.
|
||||||
let unique = UniqueConnec::empty();
|
let unique = UniqueConnec::empty();
|
||||||
assert_eq!(unique.state(), UniqueConnecState::Empty);
|
assert_eq!(unique.state(), UniqueConnecState::Empty);
|
||||||
let unique2 = unique.clone();
|
let unique2 = unique.clone();
|
||||||
let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| {
|
let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| {
|
||||||
unique2.set_until((), future::empty())
|
unique2.tie_or_stop((), future::empty())
|
||||||
});
|
});
|
||||||
let fut = unique.get_or_dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(),
|
let fut = unique.dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(), DeniedTransport);
|
||||||
DeniedTransport);
|
|
||||||
assert!(fut.wait().is_err());
|
assert!(fut.wait().is_err());
|
||||||
assert_eq!(unique.state(), UniqueConnecState::Errored);
|
assert_eq!(unique.state(), UniqueConnecState::Errored);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: more tests
|
#[test]
|
||||||
|
fn tie_or_stop_stops() {
|
||||||
|
// Tests that `tie_or_stop` destroys additional futures passed to it.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let unique_connec2 = unique_connec.clone();
|
||||||
|
|
||||||
|
// This channel is used to detect whether the future has been dropped.
|
||||||
|
let (msg_tx, msg_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let mut num_connec = 0;
|
||||||
|
let mut msg_rx = Some(msg_rx);
|
||||||
|
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
|
||||||
|
num_connec += 1;
|
||||||
|
if num_connec == 1 {
|
||||||
|
unique_connec2.tie_or_stop(12, future::Either::A(future::empty()))
|
||||||
|
} else {
|
||||||
|
let fut = msg_rx.take().unwrap().map_err(|_| panic!());
|
||||||
|
unique_connec2.tie_or_stop(13, future::Either::B(fut))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx.clone())
|
||||||
|
.map(|val| { assert_eq!(val, 12); })
|
||||||
|
.inspect({
|
||||||
|
let c = unique_connec.clone();
|
||||||
|
move |_| { assert!(c.is_alive()); }
|
||||||
|
})
|
||||||
|
.and_then(|_| {
|
||||||
|
tokio_timer::sleep(Duration::from_secs(1))
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
})
|
||||||
|
.and_then(move |_| {
|
||||||
|
swarm_ctrl2.dial("/memory".parse().unwrap(), tx)
|
||||||
|
.unwrap_or_else(|_| panic!())
|
||||||
|
})
|
||||||
|
.inspect({
|
||||||
|
let c = unique_connec.clone();
|
||||||
|
move |_| {
|
||||||
|
assert_eq!(c.poll(), Some(12)); // Not 13
|
||||||
|
assert!(msg_tx.send(()).is_err());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let future = dial_success
|
||||||
|
.select(swarm_future2).map(|_| ()).map_err(|(err, _)| err)
|
||||||
|
.select(swarm_future1).map(|_| ()).map_err(|(err, _)| err);
|
||||||
|
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert!(unique_connec.is_alive());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tie_or_passthrough_passes_through() {
|
||||||
|
// Tests that `tie_or_passthrough` doesn't delete additional futures passed to it when
|
||||||
|
// it is already full, and doesn't gets its value modified when that happens.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let unique_connec2 = unique_connec.clone();
|
||||||
|
|
||||||
|
let mut num = 12;
|
||||||
|
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| {
|
||||||
|
// Note that this handles both the dial and the listen.
|
||||||
|
let fut = future::empty().then(|_: Result<(), ()>| -> Result<(), IoError> { panic!() });
|
||||||
|
num += 1;
|
||||||
|
unique_connec2.tie_or_passthrough(num, fut)
|
||||||
|
});
|
||||||
|
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx.clone())
|
||||||
|
.map(|val| { assert_eq!(val, 13); });
|
||||||
|
|
||||||
|
swarm_ctrl.dial("/memory".parse().unwrap(), tx)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let future = dial_success.select(swarm_future).map_err(|(err, _)| err);
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert_eq!(unique_connec.poll(), Some(13));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cleared_when_future_drops() {
|
||||||
|
// Tests that the `UniqueConnec` gets cleared when the future we associate with it gets
|
||||||
|
// destroyed.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let unique_connec2 = unique_connec.clone();
|
||||||
|
|
||||||
|
let (msg_tx, msg_rx) = oneshot::channel();
|
||||||
|
let mut msg_rx = Some(msg_rx);
|
||||||
|
|
||||||
|
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
|
||||||
|
let fut = msg_rx.take().unwrap().map_err(|_| -> IoError { unreachable!() });
|
||||||
|
unique_connec2.tie_or_stop(12, fut)
|
||||||
|
});
|
||||||
|
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx)
|
||||||
|
.map(|val| { assert_eq!(val, 12); })
|
||||||
|
.inspect({
|
||||||
|
let c = unique_connec.clone();
|
||||||
|
move |_| { assert!(c.is_alive()); }
|
||||||
|
})
|
||||||
|
.and_then(|_| {
|
||||||
|
msg_tx.send(()).unwrap();
|
||||||
|
tokio_timer::sleep(Duration::from_secs(1))
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
})
|
||||||
|
.inspect({
|
||||||
|
let c = unique_connec.clone();
|
||||||
|
move |_| { assert!(!c.is_alive()); }
|
||||||
|
});
|
||||||
|
|
||||||
|
let future = dial_success
|
||||||
|
.select(swarm_future1).map(|_| ()).map_err(|(err, _)| err)
|
||||||
|
.select(swarm_future2).map(|_| ()).map_err(|(err, _)| err);
|
||||||
|
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert!(!unique_connec.is_alive());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn future_drops_when_cleared() {
|
||||||
|
// Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get cleared.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let unique_connec2 = unique_connec.clone();
|
||||||
|
|
||||||
|
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let finished = Arc::new(atomic::AtomicBool::new(false));
|
||||||
|
let finished2 = finished.clone();
|
||||||
|
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
|
||||||
|
let finished2 = finished2.clone();
|
||||||
|
unique_connec2.tie_or_stop(12, future::empty()).then(move |v| {
|
||||||
|
finished2.store(true, atomic::Ordering::Relaxed);
|
||||||
|
v
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx)
|
||||||
|
.map(|val| { assert_eq!(val, 12); })
|
||||||
|
.inspect({
|
||||||
|
let c = unique_connec.clone();
|
||||||
|
move |_| {
|
||||||
|
assert!(c.is_alive());
|
||||||
|
c.clear();
|
||||||
|
assert!(!c.is_alive());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.and_then(|_| {
|
||||||
|
tokio_timer::sleep(Duration::from_secs(1))
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
})
|
||||||
|
.inspect({
|
||||||
|
let c = unique_connec.clone();
|
||||||
|
move |_| {
|
||||||
|
assert!(finished.load(atomic::Ordering::Relaxed));
|
||||||
|
assert!(!c.is_alive());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let future = dial_success
|
||||||
|
.select(swarm_future1).map(|_| ()).map_err(|(err, _)| err)
|
||||||
|
.select(swarm_future2).map(|_| ()).map_err(|(err, _)| err);
|
||||||
|
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
assert!(!unique_connec.is_alive());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn future_drops_when_destroyed() {
|
||||||
|
// Tests that the future returned by `tie_or_*` ends when the `UniqueConnec` get dropped.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let mut unique_connec2 = Some(unique_connec.clone());
|
||||||
|
|
||||||
|
let (swarm_ctrl1, swarm_future1) = swarm(rx.with_dummy_muxing(), move |_, _| {
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
swarm_ctrl1.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let finished = Arc::new(atomic::AtomicBool::new(false));
|
||||||
|
let finished2 = finished.clone();
|
||||||
|
let (swarm_ctrl2, swarm_future2) = swarm(tx.clone().with_dummy_muxing(), move |_, _| {
|
||||||
|
let finished2 = finished2.clone();
|
||||||
|
unique_connec2.take().unwrap().tie_or_stop(12, future::empty()).then(move |v| {
|
||||||
|
finished2.store(true, atomic::Ordering::Relaxed);
|
||||||
|
v
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl2, &"/memory".parse().unwrap(), tx)
|
||||||
|
.map(|val| { assert_eq!(val, 12); })
|
||||||
|
.inspect(move |_| {
|
||||||
|
assert!(unique_connec.is_alive());
|
||||||
|
drop(unique_connec);
|
||||||
|
})
|
||||||
|
.and_then(|_| {
|
||||||
|
tokio_timer::sleep(Duration::from_secs(1))
|
||||||
|
.map_err(|_| unreachable!())
|
||||||
|
})
|
||||||
|
.inspect(move |_| {
|
||||||
|
assert!(finished.load(atomic::Ordering::Relaxed));
|
||||||
|
});
|
||||||
|
|
||||||
|
let future = dial_success
|
||||||
|
.select(swarm_future1).map(|_| ()).map_err(|(err, _)| err)
|
||||||
|
.select(swarm_future2).map(|_| ()).map_err(|(err, _)| err);
|
||||||
|
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn error_if_unique_connec_destroyed_before_future() {
|
||||||
|
// Tests that the future returned by `dial` returns an error if the `UniqueConnec` no
|
||||||
|
// longer exists.
|
||||||
|
let (tx, rx) = transport::connector();
|
||||||
|
|
||||||
|
let (swarm_ctrl, swarm_future) = swarm(rx.with_dummy_muxing(), move |_, _| {
|
||||||
|
future::empty()
|
||||||
|
});
|
||||||
|
swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap();
|
||||||
|
|
||||||
|
let unique_connec = UniqueConnec::empty();
|
||||||
|
let dial_success = unique_connec
|
||||||
|
.dial(&swarm_ctrl, &"/memory".parse().unwrap(), tx)
|
||||||
|
.then(|val: Result<(), IoError>| {
|
||||||
|
assert!(val.is_err());
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
drop(unique_connec);
|
||||||
|
|
||||||
|
let future = dial_success
|
||||||
|
.select(swarm_future).map(|_| ()).map_err(|(err, _)| err);
|
||||||
|
current_thread::Runtime::new().unwrap().block_on(future).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: test that dialing is interrupted when UniqueConnec is cleared
|
||||||
|
// TODO: test that dialing is interrupted when UniqueConnec is dropped
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ fn main() {
|
|||||||
active_kad_connections
|
active_kad_connections
|
||||||
.entry(node_id)
|
.entry(node_id)
|
||||||
.or_insert_with(Default::default)
|
.or_insert_with(Default::default)
|
||||||
.set_until(kad_ctrl, fut)
|
.tie_or_passthrough(kad_ctrl, fut)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -195,7 +195,7 @@ fn main() {
|
|||||||
let addr = Multiaddr::from(libp2p::multiaddr::AddrComponent::P2P(cid));
|
let addr = Multiaddr::from(libp2p::multiaddr::AddrComponent::P2P(cid));
|
||||||
active_kad_connections.lock().unwrap().entry(peer.clone())
|
active_kad_connections.lock().unwrap().entry(peer.clone())
|
||||||
.or_insert_with(Default::default)
|
.or_insert_with(Default::default)
|
||||||
.get_or_dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new()))
|
.dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new()))
|
||||||
})
|
})
|
||||||
.filter_map(move |event| {
|
.filter_map(move |event| {
|
||||||
match event {
|
match event {
|
||||||
|
Reference in New Issue
Block a user