diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 9e212810..596ad3b1 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -358,18 +358,14 @@ where /// Sends an event to all nodes. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - pub fn start_broadcast(&mut self, event: &TInEvent) + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. + #[must_use] + pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> where TInEvent: Clone { - self.inner.start_broadcast(event) - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - self.inner.poll_ready_broadcast(cx) + self.inner.poll_broadcast(event, cx) } /// Adds an existing connection to a node to the collection. diff --git a/core/src/nodes/network.rs b/core/src/nodes/network.rs index c6c69a9d..3975b021 100644 --- a/core/src/nodes/network.rs +++ b/core/src/nodes/network.rs @@ -845,20 +845,16 @@ where }) } - /// Start sending an event to all nodes. + /// Sends an event to all nodes. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - pub fn start_broadcast(&mut self, event: &TInEvent) + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. + #[must_use] + pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> where TInEvent: Clone { - self.active_nodes.start_broadcast(event) - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - self.active_nodes.poll_ready_broadcast(cx) + self.active_nodes.poll_broadcast(event, cx) } /// Returns a list of all the peers we are currently connected to. diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index b17b1cf3..dbfe485a 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -67,7 +67,7 @@ pub struct Manager { /// `local_spawns` list instead. threads_pool: Option, - /// If no executor is available, we move tasks to this list, and futures are polled on the + /// If no executor is available, we move tasks to this set, and futures are polled on the /// current thread instead. local_spawns: FuturesUnordered + Send>>>, @@ -221,36 +221,30 @@ impl Manager { task_id } - /// Start sending an event to all the tasks, including the pending ones. + /// Sends a message to all the tasks, including the pending ones. /// - /// Must be called only after a successful call to `poll_ready_broadcast`. - /// - /// After starting a broadcast make sure to finish it with `complete_broadcast`, - /// otherwise starting another broadcast or sending an event directly to a - /// task would overwrite the pending broadcast. + /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event + /// has been sent to any node yet. #[must_use] - pub fn start_broadcast(&mut self, event: &I) + pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()> where I: Clone { + for task in self.tasks.values_mut() { + if let Poll::Pending = task.sender.poll_ready(cx) { + return Poll::Pending; + } + } + for task in self.tasks.values_mut() { let msg = ToTaskMessage::HandlerEvent(event.clone()); match task.sender.start_send(msg) { Ok(()) => {}, - Err(ref err) if err.is_full() => {}, // TODO: somehow report to user? + Err(ref err) if err.is_full() => + panic!("poll_ready returned Poll::Ready just above; qed"), Err(_) => {}, } } - } - - /// Wait until we have enough room in senders to broadcast an event. - #[must_use] - pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> { - for task in self.tasks.values_mut() { - if let Poll::Pending = task.sender.poll_ready(cx) { - return Poll::Pending; - } - } Poll::Ready(()) } @@ -470,4 +464,3 @@ impl fmt::Debug for ClosedTask { .finish() } } - diff --git a/core/src/nodes/tasks/mod.rs b/core/src/nodes/tasks/mod.rs index 2af4939c..5275121f 100644 --- a/core/src/nodes/tasks/mod.rs +++ b/core/src/nodes/tasks/mod.rs @@ -29,7 +29,7 @@ //! an existing connection to a node should be driven forward (cf. //! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`] //! and messages can be sent to individual tasks or all (cf. -//! [`Manager::start_broadcast`]). Messages produces by tasks can be +//! [`Manager::poll_broadcast`]). Messages produces by tasks can be //! retrieved by polling the manager (cf. [`Manager::poll`]). mod error;