diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5be1fc4f..2761a877 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1454,7 +1454,7 @@ impl fmt::Debug for GossipsubRpc { } /// Event that can happen on the gossipsub behaviour. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum GossipsubEvent { /// A message has been received. This contains the PeerId that we received the message from, /// the message id (used if the application layer needs to propagate the message) and the @@ -1534,4 +1534,3 @@ impl fmt::Debug for PublishConfig { } } } - diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 25e6d64d..28849622 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -103,32 +103,23 @@ impl Graph { } } - /// Polls the graph and passes each event into the provided FnMut until it returns `true`. - fn wait_for(self, mut f: F) -> Self - where - F: FnMut(GossipsubEvent) -> bool, - { - // The future below should return self. Given that it is a FnMut and not a FnOnce, one needs - // to wrap `self` in an Option, leaving a `None` behind after the final `Poll::Ready`. - let mut this = Some(self); - - let fut = futures::future::poll_fn(move |cx| match &mut this { - Some(graph) => loop { - match graph.poll_unpin(cx) { - Poll::Ready((_addr, ev)) => { - if f(ev) { - return Poll::Ready(this.take().unwrap()); - } - } - Poll::Pending => return Poll::Pending, + /// Polls the graph and passes each event into the provided FnMut until the closure returns + /// `true`. + /// + /// Returns [`true`] on success and [`false`] on timeout. + fn wait_for bool>(&mut self, mut f: F) -> bool { + let fut = futures::future::poll_fn(move |cx| { + match self.poll_unpin(cx) { + Poll::Ready((_addr, ev)) if f(ev.clone()) => { + Poll::Ready(()) } - }, - None => panic!("future called after final return"), + _ => Poll::Pending, + } }); let fut = async_std::future::timeout(Duration::from_secs(10), fut); - futures::executor::block_on(fut).unwrap() + futures::executor::block_on(fut).is_ok() } /// Polls the graph until Poll::Pending is obtained, completing the underlying polls. @@ -215,7 +206,7 @@ fn multi_hop_propagation() { // Wait for all nodes to be subscribed. let mut subscribed = 0; - graph = graph.wait_for(move |ev| { + let all_subscribed = graph.wait_for(move |ev| { if let GossipsubEvent::Subscribed { .. } = ev { subscribed += 1; if subscribed == (number_nodes - 1) * 2 { @@ -225,6 +216,12 @@ fn multi_hop_propagation() { false }); + if !all_subscribed { + return TestResult::error(format!( + "Timed out waiting for all nodes to subscribe but only have {:?}/{:?}.", + subscribed, num_nodes, + )); + } // It can happen that the publish occurs before all grafts have completed causing this test // to fail. We drain all the poll messages before publishing. @@ -235,7 +232,7 @@ fn multi_hop_propagation() { // Wait for all nodes to receive the published message. let mut received_msgs = 0; - graph.wait_for(move |ev| { + let all_received = graph.wait_for(move |ev| { if let GossipsubEvent::Message(..) = ev { received_msgs += 1; if received_msgs == number_nodes - 1 { @@ -245,6 +242,12 @@ fn multi_hop_propagation() { false }); + if !all_received { + return TestResult::error(format!( + "Timed out waiting for all nodes to receive the msg but only have {:?}/{:?}.", + received_msgs, num_nodes, + )); + } TestResult::passed() }