SwarmController::dial now returns a Future (#290)

* SwarmController::dial now returns a Future

* Minor comment
This commit is contained in:
Pierre Krieger
2018-07-14 13:46:11 +02:00
committed by GitHub
parent c05e7e0c4e
commit 4592d1b21e
2 changed files with 39 additions and 19 deletions

View File

@ -19,10 +19,10 @@
// DEALINGS IN THE SOFTWARE.
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::sync::mpsc;
use futures::sync::{mpsc, oneshot};
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use std::fmt;
use std::io::Error as IoError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use {Multiaddr, MuxedTransport, Transport};
/// Creates a swarm.
@ -76,7 +76,7 @@ where
{
transport: T,
new_listeners: mpsc::UnboundedSender<T::Listener>,
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
}
@ -112,8 +112,12 @@ where
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`.
// TODO: consider returning a future so that errors can be processed?
pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr>
///
/// Returns a future that is signalled once the closure in the `swarm` has returned its future.
/// Therefore if the closure in the swarm has some side effect (eg. write something in a
/// variable), this side effect will be observable when this future succeeds.
pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du)
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<T::Output>,
@ -122,13 +126,31 @@ where
match transport.dial(multiaddr.clone()) {
Ok(dial) => {
let dial = Box::new(
dial.map(|(d, client_addr)| (d.into(), Box::new(client_addr) as Box<Future<Item = _, Error = _>>)),
) as Box<Future<Item = _, Error = _>>;
let (tx, rx) = oneshot::channel();
let dial = dial.then(|result| {
match result {
Ok((output, client_addr)) => {
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _>>;
Ok((output.into(), tx, client_addr))
}
Err(err) => {
debug!("Error in dialer upgrade: {:?}", err);
let _ = tx.send(Err(err));
Err(())
}
}
});
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_dialers.unbounded_send(dial);
Ok(())
let _ = self.new_dialers.unbounded_send(Box::new(dial) as Box<_>);
Ok(rx.then(|result| {
match result {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(_) => Err(IoError::new(IoErrorKind::ConnectionAborted,
"dial cancelled the swarm future has been destroyed")),
}
}))
}
Err((_, multiaddr)) => Err(multiaddr),
}
@ -171,9 +193,9 @@ where
>,
listeners_upgrade:
FuturesUnordered<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
new_dialers:
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
}
@ -291,16 +313,14 @@ where
loop {
match self.dialers.poll() {
Ok(Async::Ready(Some((output, addr)))) => {
Ok(Async::Ready(Some((output, notifier, addr)))) => {
trace!("Successfully upgraded dialed connection");
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
let _ = notifier.send(Ok(()));
}
Err(err) => {
debug!("Error in dialer upgrade: {:?}", err);
break;
}
_ => break
Err(()) => break,
_ => break,
}
}