diff --git a/core/src/lib.rs b/core/src/lib.rs index c7956b98..f0df1e7c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -177,7 +177,7 @@ //! extern crate libp2p_tcp_transport; //! extern crate tokio_current_thread; //! -//! use futures::Future; +//! use futures::{Future, Stream}; //! use libp2p_ping::{Ping, PingOutput}; //! use libp2p_core::Transport; //! @@ -200,7 +200,7 @@ //! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); //! //! // Runs until everything is finished. -//! tokio_current_thread::block_on_all(swarm_future).unwrap(); +//! tokio_current_thread::block_on_all(swarm_future.for_each(|_| Ok(()))).unwrap(); //! # } //! ``` @@ -252,7 +252,7 @@ pub use self::multiaddr::{AddrComponent, Multiaddr}; pub use self::muxing::StreamMuxer; pub use self::peer_id::PeerId; pub use self::public_key::PublicKey; -pub use self::swarm::{swarm, SwarmController, SwarmFuture}; +pub use self::swarm::{swarm, SwarmController, SwarmEvents}; pub use self::transport::{MuxedTransport, Transport}; pub use self::unique::{UniqueConnec, UniqueConnecFuture, UniqueConnecState}; pub use self::upgrade::{ConnectionUpgrade, Endpoint}; diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 719dda3e..35f58857 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -38,7 +38,7 @@ use {Multiaddr, MuxedTransport, Transport}; pub fn swarm( transport: T, handler: H, -) -> (SwarmController, SwarmFuture) +) -> (SwarmController, SwarmEvents) where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ H: FnMut(T::Output, Box>) -> F, @@ -53,7 +53,7 @@ where task_to_notify: None, })); - let future = SwarmFuture { + let future = SwarmEvents { transport: transport.clone(), shared: shared.clone(), handler: handler, @@ -68,18 +68,18 @@ where } /// Allows control of what the swarm is doing. -pub struct SwarmController +pub struct SwarmController where T: MuxedTransport + 'static, // TODO: 'static :-/ { /// Shared between the swarm infrastructure. - shared: Arc>>, + shared: Arc>>, /// Transport used to dial or listen. transport: T, } -impl fmt::Debug for SwarmController +impl fmt::Debug for SwarmController where T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/ { @@ -90,11 +90,11 @@ where } } -impl Clone for SwarmController +impl Clone for SwarmController where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ { - fn clone(&self) -> SwarmController { + fn clone(&self) -> Self { SwarmController { transport: self.transport.clone(), shared: self.shared.clone(), @@ -102,9 +102,10 @@ where } } -impl SwarmController +impl SwarmController where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ + F: 'static, { /// Asks the swarm to dial the node with the given multiaddress. The connection is then /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when @@ -127,12 +128,12 @@ where /// dialing fails or the handler has been called with the resulting future. /// /// The returned future is filled with the output of `then`. - pub(crate) fn dial_then(&self, multiaddr: Multiaddr, transport: Du, then: F) + pub(crate) fn dial_then(&self, multiaddr: Multiaddr, transport: Du, then: TThen) -> Result, Multiaddr> where Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, - F: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static, + TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static, { trace!("Swarm dialing {}", multiaddr); @@ -157,8 +158,9 @@ where } Err(err) => { debug!("Error in dialer upgrade: {:?}", err); + let err_clone = IoError::new(err.kind(), err.to_string()); then(Err(err)); - Err(()) + Err(err_clone) } } }); @@ -210,7 +212,7 @@ where Box::new(f) as Box> }), ) as Box>; - shared.listeners.push(listener.into_future()); + shared.listeners.push((new_addr.clone(), listener.into_future())); if let Some(task) = shared.task_to_notify.take() { task.notify(); } @@ -223,12 +225,12 @@ where /// Future that must be driven to completion in order for the swarm to work. #[must_use = "futures do nothing unless polled"] -pub struct SwarmFuture +pub struct SwarmEvents where T: MuxedTransport + 'static, // TODO: 'static :-/ { /// Shared between the swarm infrastructure. - shared: Arc>>, + shared: Arc>>, /// The transport used to dial. transport: T, @@ -237,17 +239,17 @@ where handler: H, } -impl Future for SwarmFuture +impl Stream for SwarmEvents where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, H: FnMut(T::Output, Box>) -> If, If: IntoFuture, F: Future + 'static, // TODO: 'static :-/ { - type Item = (); + type Item = SwarmEvent; type Error = IoError; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll, Self::Error> { let mut shared = self.shared.lock(); let handler = &mut self.handler; @@ -266,7 +268,7 @@ where // TODO: should that stop everything? debug!("Error in multiplexed incoming connection: {:?}", err); shared.next_incoming = self.transport.clone().next_incoming(); - break; + return Ok(Async::Ready(Some(SwarmEvent::IncomingError(err)))); } } } @@ -274,7 +276,7 @@ where // We remove each element from `shared.listeners` one by one and add them back only // if relevant. for n in (0 .. shared.listeners.len()).rev() { - let mut listener = shared.listeners.swap_remove(n); + let (listen_addr, mut listener) = shared.listeners.swap_remove(n); loop { match listener.poll() { Ok(Async::Ready((Some(upgrade), remaining))) => { @@ -284,14 +286,19 @@ where } Ok(Async::Ready((None, _))) => { debug!("Listener closed gracefully"); - break; + return Ok(Async::Ready(Some(SwarmEvent::ListenerClosed { + listen_addr + }))); }, - Err((err, _)) => { - debug!("Error in listener: {:?}", err); - break; + Err((error, _)) => { + debug!("Error in listener: {:?}", error); + return Ok(Async::Ready(Some(SwarmEvent::ListenerError { + listen_addr, + error, + }))); } Ok(Async::NotReady) => { - shared.listeners.push(listener); + shared.listeners.push((listen_addr, listener)); break; } } @@ -307,10 +314,11 @@ where debug!("Successfully upgraded incoming connection"); // TODO: unlock mutex before calling handler, in order to avoid deadlocks if // the user does something stupid - shared.to_process.push(Box::new(handler(output, client_addr).into_future())); + shared.to_process.push(handler(output, client_addr).into_future()); } Err(err) => { debug!("Error in listener upgrade: {:?}", err); + return Ok(Async::Ready(Some(SwarmEvent::ListenerUpgradeError(err)))); } Ok(Async::NotReady) => { shared.listeners_upgrade.push(listener_upgrade); @@ -327,10 +335,15 @@ where trace!("Successfully upgraded dialed connection"); // TODO: unlock mutex before calling handler, in order to avoid deadlocks if // the user does something stupid - shared.to_process.push(Box::new(handler(output, addr).into_future())); + shared.to_process.push(handler(output, addr).into_future()); notifier(Ok(())); } - Err(()) => {}, + Err(error) => { + return Ok(Async::Ready(Some(SwarmEvent::DialFailed { + client_addr, + error, + }))); + }, Ok(Async::NotReady) => { shared.dialers.push((client_addr, dialer)); }, @@ -344,9 +357,16 @@ where match to_process.poll() { Ok(Async::Ready(())) => { trace!("Future returned by swarm handler driven to completion"); + return Ok(Async::Ready(Some(SwarmEvent::HandlerFinished { + handler_future: to_process, + }))); } - Err(err) => { - debug!("Error in processing: {:?}", err); + Err(error) => { + debug!("Error in processing: {:?}", error); + return Ok(Async::Ready(Some(SwarmEvent::HandlerError { + handler_future: to_process, + error, + }))); } Ok(Async::NotReady) => { shared.to_process.push(to_process); @@ -363,12 +383,13 @@ where } // TODO: stronger typing -struct Shared where T: MuxedTransport + 'static { +struct Shared where T: MuxedTransport + 'static { /// Next incoming substream on the transport. next_incoming: T::Incoming, /// All the active listeners. - listeners: Vec< + listeners: Vec<( + Multiaddr, StreamFuture< Box< Stream< @@ -377,7 +398,7 @@ struct Shared where T: MuxedTransport + 'static { >, >, >, - >, + )>, /// Futures that upgrade an incoming listening connection to a full connection. listeners_upgrade: @@ -386,19 +407,65 @@ struct Shared where T: MuxedTransport + 'static { /// Futures that dial a remote address. /// /// Contains the address we dial, so that we can cancel it if necessary. - dialers: Vec<(Multiaddr, Box)>, Box>), Error = ()>>)>, + dialers: Vec<(Multiaddr, Box)>, Box>), Error = IoError>>)>, /// List of futures produced by the swarm closure. Must be processed to the end. - to_process: Vec>>, + to_process: Vec, /// The task to notify whenever we add a new element in one of the lists. /// Necessary so that the task wakes up and the element gets polled. task_to_notify: Option, } +/// Event that happens in the swarm. +#[derive(Debug)] +pub enum SwarmEvent { + /// An error has happened while polling the muxed transport for incoming connections. + IncomingError(IoError), + + /// A listener has gracefully closed. + ListenerClosed { + /// Address the listener was listening on. + listen_addr: Multiaddr, + }, + + /// A listener has stopped because it produced an error. + ListenerError { + /// Address the listener was listening on. + listen_addr: Multiaddr, + /// The error that happened. + error: IoError, + }, + + /// An error happened while upgrading an incoming connection. + ListenerUpgradeError(IoError), + + /// Failed to dial a remote address. + DialFailed { + /// Address we were trying to dial. + client_addr: Multiaddr, + /// Error that happened. + error: IoError, + }, + + /// A future returned by the handler has finished. + HandlerFinished { + /// The future originally returned by the handler. + handler_future: F, + }, + + /// A future returned by the handler has produced an error. + HandlerError { + /// The future originally returned by the handler. + handler_future: F, + /// The error that happened. + error: IoError, + }, +} + #[cfg(test)] mod tests { - use futures::{Future, future}; + use futures::{Future, Stream, future}; use rand; use transport::{self, DeniedTransport, Transport}; use std::io::Error as IoError; @@ -441,8 +508,8 @@ mod tests { }); let dial_success = swarm_ctrl2.dial("/memory".parse().unwrap(), tx).unwrap(); - let future = swarm_future2 - .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) + let future = swarm_future2.for_each(|_| Ok(())) + .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) .select(dial_success).map(|_| ()).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); @@ -468,7 +535,7 @@ mod tests { } let future = future::join_all(dials) .map(|_| ()) - .select(swarm_future) + .select(swarm_future.for_each(|_| Ok(()))) .map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); assert_eq!(reached.load(atomic::Ordering::SeqCst), num_dials); @@ -484,7 +551,7 @@ mod tests { }); swarm_ctrl.listen_on("/memory".parse().unwrap()).unwrap(); let dial_success = swarm_ctrl.dial("/memory".parse().unwrap(), tx).unwrap(); - let future = dial_success.select(swarm_future) + let future = dial_success.select(swarm_future.for_each(|_| Ok(()))) .map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); } diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index e3f6b19a..ffbb2069 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -264,7 +264,9 @@ mod tests { .map(|_| ()) .map_err(|((), _)| io::Error::new(io::ErrorKind::Other, "receive error")); - let future = future.select(finish_rx) + let future = future + .for_each(|_| Ok(())) + .select(finish_rx) .map(|_| ()) .map_err(|(e, _)| e); diff --git a/core/src/unique.rs b/core/src/unique.rs index 003ea1a5..084596f0 100644 --- a/core/src/unique.rs +++ b/core/src/unique.rs @@ -99,12 +99,13 @@ impl UniqueConnec { /// One critical property of this method, is that if a connection incomes and `tie_*` is /// called, then it will be returned by the returned future. #[inline] - pub fn dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, - transport: Du) -> UniqueConnecFuture + pub fn dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + transport: Du) -> UniqueConnecFuture where T: Clone + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, S: Clone + MuxedTransport, + F: 'static, { self.dial_inner(swarm, multiaddr, transport, true) } @@ -112,23 +113,25 @@ impl UniqueConnec { /// Same as `dial`, except that the future will produce an error if an earlier attempt to dial /// has errored. #[inline] - pub fn dial_if_empty(&self, swarm: &SwarmController, multiaddr: &Multiaddr, - transport: Du) -> UniqueConnecFuture + pub fn dial_if_empty(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + transport: Du) -> UniqueConnecFuture where T: Clone + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, S: Clone + MuxedTransport, + F: 'static, { self.dial_inner(swarm, multiaddr, transport, false) } /// Inner implementation of `dial_*`. - fn dial_inner(&self, swarm: &SwarmController, multiaddr: &Multiaddr, - transport: Du, dial_if_err: bool) -> UniqueConnecFuture + fn dial_inner(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + transport: Du, dial_if_err: bool) -> UniqueConnecFuture where T: Clone + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, S: Clone + MuxedTransport, + F: 'static, { let mut inner = self.inner.lock(); match &*inner { @@ -426,7 +429,7 @@ pub enum UniqueConnecState { #[cfg(test)] mod tests { - use futures::{future, sync::oneshot, Future}; + use futures::{future, sync::oneshot, Future, Stream}; use transport::DeniedTransport; use std::io::Error as IoError; use std::sync::{Arc, atomic}; @@ -456,7 +459,7 @@ mod tests { .map(|val| { assert_eq!(val, 12); }); assert_eq!(unique_connec.state(), UniqueConnecState::Pending); - let future = dial_success.select(swarm_future).map_err(|(err, _)| err); + let future = dial_success.select(swarm_future.for_each(|_| Ok(()))).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); assert_eq!(unique_connec.state(), UniqueConnecState::Full); } @@ -526,8 +529,8 @@ mod tests { }); let future = dial_success - .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err); + .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); assert!(unique_connec.is_alive()); @@ -557,7 +560,7 @@ mod tests { swarm_ctrl.dial("/memory".parse().unwrap(), tx) .unwrap(); - let future = dial_success.select(swarm_future).map_err(|(err, _)| err); + let future = dial_success.select(swarm_future.for_each(|_| Ok(()))).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); assert_eq!(unique_connec.poll(), Some(13)); } @@ -601,8 +604,8 @@ mod tests { }); let future = dial_success - .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err); + .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); assert!(!unique_connec.is_alive()); @@ -654,8 +657,8 @@ mod tests { }); let future = dial_success - .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err); + .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); assert!(!unique_connec.is_alive()); @@ -699,8 +702,8 @@ mod tests { }); let future = dial_success - .select(swarm_future1).map(|_| ()).map_err(|(err, _)| err) - .select(swarm_future2).map(|_| ()).map_err(|(err, _)| err); + .select(swarm_future1.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) + .select(swarm_future2.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); } @@ -726,7 +729,7 @@ mod tests { drop(unique_connec); let future = dial_success - .select(swarm_future).map(|_| ()).map_err(|(err, _)| err); + .select(swarm_future.for_each(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err); current_thread::Runtime::new().unwrap().block_on(future).unwrap(); } diff --git a/core/tests/lots_of_connec.rs b/core/tests/lots_of_connec.rs index aa0a5f22..99a69a9e 100644 --- a/core/tests/lots_of_connec.rs +++ b/core/tests/lots_of_connec.rs @@ -27,7 +27,7 @@ extern crate rand; extern crate tokio_current_thread; extern crate tokio_io; -use futures::{future, future::Future}; +use futures::{future, future::Future, Stream}; use libp2p_core::Transport; use libp2p_tcp_transport::TcpConfig; use std::sync::{atomic, Arc}; @@ -52,7 +52,7 @@ fn lots_of_swarms() { ); swarm_controllers.push(ctrl); - swarm_futures.push(fut); + swarm_futures.push(fut.for_each(|_| Ok(()))); } let mut addresses = Vec::new(); diff --git a/examples/echo-dialer.rs b/examples/echo-dialer.rs index bc8f4123..52f5a528 100644 --- a/examples/echo-dialer.rs +++ b/examples/echo-dialer.rs @@ -133,6 +133,7 @@ fn main() { // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the // future through the tokio core. let final_future = swarm_future + .for_each(|_| Ok(())) .select(finished_rx.map_err(|_| unreachable!())) .map(|_| ()) .map_err(|(err, _)| err); diff --git a/examples/echo-server.rs b/examples/echo-server.rs index e6b67867..2c504b6d 100644 --- a/examples/echo-server.rs +++ b/examples/echo-server.rs @@ -141,5 +141,5 @@ fn main() { // `swarm_future` is a future that contains all the behaviour that we want, but nothing has // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the // future through the tokio core. - tokio_current_thread::block_on_all(swarm_future).unwrap(); + tokio_current_thread::block_on_all(swarm_future.for_each(|_| Ok(()))).unwrap(); } diff --git a/examples/floodsub.rs b/examples/floodsub.rs index 6b45ca42..4c0aed5a 100644 --- a/examples/floodsub.rs +++ b/examples/floodsub.rs @@ -149,6 +149,7 @@ fn main() { }; let final_fut = swarm_future + .for_each(|_| Ok(())) .select(floodsub_rx) .map(|_| ()) .map_err(|e| e.0) diff --git a/examples/kademlia.rs b/examples/kademlia.rs index aa5fa950..1770f26b 100644 --- a/examples/kademlia.rs +++ b/examples/kademlia.rs @@ -18,7 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -#![type_length_limit = "2097152"] +// Libp2p's code unfortunately produces very large types. Rust's default length limit for type +// names is not large enough, therefore we need this attribute. +#![type_length_limit = "4194304"] extern crate bigint; extern crate bytes; @@ -231,7 +233,7 @@ fn main() { // future through the tokio core. tokio_current_thread::block_on_all( finish_enum - .select(swarm_future) + .select(swarm_future.for_each(|_| Ok(()))) .map(|(n, _)| n) .map_err(|(err, _)| err), ).unwrap(); diff --git a/examples/ping-client.rs b/examples/ping-client.rs index db85970b..16ee07e4 100644 --- a/examples/ping-client.rs +++ b/examples/ping-client.rs @@ -25,7 +25,7 @@ extern crate libp2p; extern crate tokio_current_thread; extern crate tokio_io; -use futures::Future; +use futures::{Future, Stream}; use futures::sync::oneshot; use std::env; use libp2p::core::Transport; @@ -112,7 +112,7 @@ fn main() { // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the // future through the tokio core. tokio_current_thread::block_on_all( - rx.select(swarm_future.map_err(|_| unreachable!())) + rx.select(swarm_future.for_each(|_| Ok(())).map_err(|_| unreachable!())) .map_err(|(e, _)| e) .map(|_| ()), ).unwrap(); diff --git a/examples/relay.rs b/examples/relay.rs index 5d453d53..0eab6c4b 100644 --- a/examples/relay.rs +++ b/examples/relay.rs @@ -152,7 +152,7 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { control.dial(address, transport.with_upgrade(echo)).map_err(|_| "failed to dial")?; - tokio_current_thread::block_on_all(future).map_err(From::from) + tokio_current_thread::block_on_all(future.for_each(|_| Ok(()))).map_err(From::from) } fn run_listener(opts: ListenerOpts) -> Result<(), Box> { @@ -206,7 +206,7 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box> { }); control.listen_on(opts.listen).map_err(|_| "failed to listen")?; - tokio_current_thread::block_on_all(future).map_err(From::from) + tokio_current_thread::block_on_all(future.for_each(|_| Ok(()))).map_err(From::from) } // Custom parsers ///////////////////////////////////////////////////////////