diff --git a/core/src/swarm.rs b/core/src/swarm.rs index ec74ae8b..7347506d 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -19,10 +19,10 @@ // DEALINGS IN THE SOFTWARE. use futures::stream::{FuturesUnordered, StreamFuture}; -use futures::sync::mpsc; +use futures::sync::{mpsc, oneshot}; use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use std::fmt; -use std::io::Error as IoError; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use {Multiaddr, MuxedTransport, Transport}; /// Creates a swarm. @@ -76,7 +76,7 @@ where { transport: T, new_listeners: mpsc::UnboundedSender, - new_dialers: mpsc::UnboundedSender>), Error = IoError>>>, + new_dialers: mpsc::UnboundedSender>, Box>), Error = ()>>>, new_toprocess: mpsc::UnboundedSender>>, } @@ -112,8 +112,12 @@ where /// Asks the swarm to dial the node with the given multiaddress. The connection is then /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when /// calling `swarm`. - // TODO: consider returning a future so that errors can be processed? - pub fn dial(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr> + /// + /// Returns a future that is signalled once the closure in the `swarm` has returned its future. + /// Therefore if the closure in the swarm has some side effect (eg. write something in a + /// variable), this side effect will be observable when this future succeeds. + pub fn dial(&self, multiaddr: Multiaddr, transport: Du) + -> Result, Multiaddr> where Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, @@ -122,13 +126,31 @@ where match transport.dial(multiaddr.clone()) { Ok(dial) => { - let dial = Box::new( - dial.map(|(d, client_addr)| (d.into(), Box::new(client_addr) as Box>)), - ) as Box>; + let (tx, rx) = oneshot::channel(); + let dial = dial.then(|result| { + match result { + Ok((output, client_addr)) => { + let client_addr = Box::new(client_addr) as Box>; + Ok((output.into(), tx, client_addr)) + } + Err(err) => { + debug!("Error in dialer upgrade: {:?}", err); + let _ = tx.send(Err(err)); + Err(()) + } + } + }); // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. - let _ = self.new_dialers.unbounded_send(dial); - Ok(()) + let _ = self.new_dialers.unbounded_send(Box::new(dial) as Box<_>); + Ok(rx.then(|result| { + match result { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(_) => Err(IoError::new(IoErrorKind::ConnectionAborted, + "dial cancelled the swarm future has been destroyed")), + } + })) } Err((_, multiaddr)) => Err(multiaddr), } @@ -171,9 +193,9 @@ where >, listeners_upgrade: FuturesUnordered>), Error = IoError>>>, - dialers: FuturesUnordered>), Error = IoError>>>, + dialers: FuturesUnordered>, Box>), Error = ()>>>, new_dialers: - mpsc::UnboundedReceiver>), Error = IoError>>>, + mpsc::UnboundedReceiver>, Box>), Error = ()>>>, to_process: FuturesUnordered>>>, new_toprocess: mpsc::UnboundedReceiver>>, } @@ -291,16 +313,14 @@ where loop { match self.dialers.poll() { - Ok(Async::Ready(Some((output, addr)))) => { + Ok(Async::Ready(Some((output, notifier, addr)))) => { trace!("Successfully upgraded dialed connection"); self.to_process .push(future::Either::A(handler(output, addr).into_future())); + let _ = notifier.send(Ok(())); } - Err(err) => { - debug!("Error in dialer upgrade: {:?}", err); - break; - } - _ => break + Err(()) => break, + _ => break, } } diff --git a/kad/src/high_level.rs b/kad/src/high_level.rs index fa112c90..dc5bd95c 100644 --- a/kad/src/high_level.rs +++ b/kad/src/high_level.rs @@ -494,7 +494,7 @@ where match self.swarm_controller .dial(addr, self.kademlia_transport.clone().map(move |out, _| map(out))) { - Ok(()) => (), + Ok(_) => (), Err(_addr) => { let fut = future::err(IoError::new( IoErrorKind::InvalidData,