Use a different fix

This commit is contained in:
Pierre Krieger
2020-01-06 17:20:01 +01:00
parent be0f18d162
commit 5e0f219555
4 changed files with 25 additions and 44 deletions

View File

@ -356,20 +356,16 @@ where
} }
} }
/// Sends an event to all nodes. /// Sends a message to all nodes.
/// ///
/// Must be called only after a successful call to `poll_ready_broadcast`. /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
pub fn start_broadcast(&mut self, event: &TInEvent) /// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where where
TInEvent: Clone TInEvent: Clone
{ {
self.inner.start_broadcast(event) self.inner.poll_broadcast(event, cx)
}
/// Wait until we have enough room in senders to broadcast an event.
#[must_use]
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
self.inner.poll_ready_broadcast(cx)
} }
/// Adds an existing connection to a node to the collection. /// Adds an existing connection to a node to the collection.

View File

@ -845,20 +845,16 @@ where
}) })
} }
/// Start sending an event to all nodes. /// Sends a message to all the tasks, including the pending ones.
/// ///
/// Must be called only after a successful call to `poll_ready_broadcast`. /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
pub fn start_broadcast(&mut self, event: &TInEvent) /// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where where
TInEvent: Clone TInEvent: Clone
{ {
self.active_nodes.start_broadcast(event) self.active_nodes.poll_broadcast(event, cx)
}
/// Wait until we have enough room in senders to broadcast an event.
#[must_use]
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
self.active_nodes.poll_ready_broadcast(cx)
} }
/// Returns a list of all the peers we are currently connected to. /// Returns a list of all the peers we are currently connected to.

View File

@ -221,39 +221,28 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
task_id task_id
} }
/// Start sending an event to all the tasks, including the pending ones. /// Sends a message to all the tasks, including the pending ones.
/// ///
/// Must be called only after a successful call to `poll_ready_broadcast`. /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// /// has been sent to any node yet.
/// After starting a broadcast make sure to finish it with `complete_broadcast`,
/// otherwise starting another broadcast or sending an event directly to a
/// task would overwrite the pending broadcast.
#[must_use] #[must_use]
pub fn start_broadcast(&mut self, event: &I) pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
where where
I: Clone I: Clone
{ {
for task in self.tasks.values_mut() { for task in self.tasks.values_mut() {
let msg = ToTaskMessage::HandlerEvent(event.clone()); if let Poll::Pending = task.sender.poll_ready(cx) {
match task.sender.start_send(msg) { return Poll::Pending;
Ok(()) => {},
Err(ref err) if err.is_full() => {
// Note that the user is expected to call `poll_ready_broadcast` beforehand,
// which returns `Poll::Ready` only if the channel isn't full. Reaching this
// path always indicates a mistake in the code.
log::warn!("start_broadcast called while channel was full. Have you called `poll_ready_broadcast` before?");
},
Err(_) => {},
}
} }
} }
/// Wait until we have enough room in senders to broadcast an event.
#[must_use]
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
for task in self.tasks.values_mut() { for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) { let msg = ToTaskMessage::HandlerEvent(event.clone());
return Poll::Pending; match task.sender.start_send(msg) {
Ok(()) => {},
Err(ref err) if err.is_full() =>
panic!("poll_ready returned Poll::Ready just above; qed"),
Err(_) => {},
} }
} }

View File

@ -29,7 +29,7 @@
//! an existing connection to a node should be driven forward (cf. //! an existing connection to a node should be driven forward (cf.
//! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`] //! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`]
//! and messages can be sent to individual tasks or all (cf. //! and messages can be sent to individual tasks or all (cf.
//! [`Manager::start_broadcast`]). Messages produces by tasks can be //! [`Manager::poll_broadcast`]). Messages produces by tasks can be
//! retrieved by polling the manager (cf. [`Manager::poll`]). //! retrieved by polling the manager (cf. [`Manager::poll`]).
mod error; mod error;