Merge pull request #1373 from tomaka/loggin-stable-fut

Address review on stable-futures
This commit is contained in:
Pierre Krieger 2020-01-06 18:38:28 +01:00 committed by GitHub
commit ced66a319c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 40 deletions

View File

@ -358,18 +358,14 @@ where
/// Sends an event to all nodes.
///
/// Must be called only after a successful call to `poll_ready_broadcast`.
pub fn start_broadcast(&mut self, event: &TInEvent)
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where
TInEvent: Clone
{
self.inner.start_broadcast(event)
}
/// Wait until we have enough room in senders to broadcast an event.
#[must_use]
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
self.inner.poll_ready_broadcast(cx)
self.inner.poll_broadcast(event, cx)
}
/// Adds an existing connection to a node to the collection.

View File

@ -845,20 +845,16 @@ where
})
}
/// Start sending an event to all nodes.
/// Sends an event to all nodes.
///
/// Must be called only after a successful call to `poll_ready_broadcast`.
pub fn start_broadcast(&mut self, event: &TInEvent)
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where
TInEvent: Clone
{
self.active_nodes.start_broadcast(event)
}
/// Wait until we have enough room in senders to broadcast an event.
#[must_use]
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
self.active_nodes.poll_ready_broadcast(cx)
self.active_nodes.poll_broadcast(event, cx)
}
/// Returns a list of all the peers we are currently connected to.

View File

@ -67,7 +67,7 @@ pub struct Manager<I, O, H, E, HE, T, C = PeerId> {
/// `local_spawns` list instead.
threads_pool: Option<ThreadPool>,
/// If no executor is available, we move tasks to this list, and futures are polled on the
/// If no executor is available, we move tasks to this set, and futures are polled on the
/// current thread instead.
local_spawns: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
@ -221,36 +221,30 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
task_id
}
/// Start sending an event to all the tasks, including the pending ones.
/// Sends a message to all the tasks, including the pending ones.
///
/// Must be called only after a successful call to `poll_ready_broadcast`.
///
/// After starting a broadcast make sure to finish it with `complete_broadcast`,
/// otherwise starting another broadcast or sending an event directly to a
/// task would overwrite the pending broadcast.
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn start_broadcast(&mut self, event: &I)
pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
where
I: Clone
{
for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) {
return Poll::Pending;
}
}
for task in self.tasks.values_mut() {
let msg = ToTaskMessage::HandlerEvent(event.clone());
match task.sender.start_send(msg) {
Ok(()) => {},
Err(ref err) if err.is_full() => {}, // TODO: somehow report to user?
Err(ref err) if err.is_full() =>
panic!("poll_ready returned Poll::Ready just above; qed"),
Err(_) => {},
}
}
}
/// Wait until we have enough room in senders to broadcast an event.
#[must_use]
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) {
return Poll::Pending;
}
}
Poll::Ready(())
}
@ -470,4 +464,3 @@ impl<E, T: fmt::Debug> fmt::Debug for ClosedTask<E, T> {
.finish()
}
}

View File

@ -29,7 +29,7 @@
//! an existing connection to a node should be driven forward (cf.
//! [`Manager::add_connection`]). Tasks can be referred to by [`TaskId`]
//! and messages can be sent to individual tasks or all (cf.
//! [`Manager::start_broadcast`]). Messages produces by tasks can be
//! [`Manager::poll_broadcast`]). Messages produces by tasks can be
//! retrieved by polling the manager (cf. [`Manager::poll`]).
mod error;