Remove notifying tasks (#528)

This commit is contained in:
Pierre Krieger
2018-10-01 11:18:00 +02:00
committed by GitHub
parent 895557cb82
commit a9eefe1015
4 changed files with 3 additions and 39 deletions

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::{prelude::*, stream, sync::mpsc, task}; use futures::{prelude::*, stream, sync::mpsc};
use muxing::StreamMuxer; use muxing::StreamMuxer;
use nodes::node::Substream; use nodes::node::Substream;
use nodes::handled_node::{HandledNode, NodeHandler}; use nodes::handled_node::{HandledNode, NodeHandler};
@ -62,8 +62,6 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent> {
/// List of node tasks to spawn. /// List of node tasks to spawn.
// TODO: stronger typing? // TODO: stronger typing?
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>, to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
/// Task to notify when an element is added to `to_spawn`.
to_notify: Option<task::Task>,
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks. /// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent>, TaskId)>, events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent>, TaskId)>,
@ -115,7 +113,6 @@ impl<TInEvent, TOutEvent> HandledNodesTasks<TInEvent, TOutEvent> {
tasks: Default::default(), tasks: Default::default(),
next_task_id: TaskId(0), next_task_id: TaskId(0),
to_spawn: SmallVec::new(), to_spawn: SmallVec::new(),
to_notify: None,
events_tx, events_tx,
events_rx, events_rx,
} }
@ -155,12 +152,6 @@ impl<TInEvent, TOutEvent> HandledNodesTasks<TInEvent, TOutEvent> {
}); });
self.to_spawn.push(task); self.to_spawn.push(task);
// We notify the polling task so that `to_spawn` gets flushed.
if let Some(task) = self.to_notify.take() {
task.notify();
}
task_id task_id
} }
@ -264,7 +255,6 @@ impl<TInEvent, TOutEvent> Stream for HandledNodesTasks<TInEvent, TOutEvent> {
} }
} }
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
self.to_notify = Some(task::current());
break Ok(Async::NotReady); break Ok(Async::NotReady);
} }
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::{prelude::*, task}; use futures::prelude::*;
use std::fmt; use std::fmt;
use void::Void; use void::Void;
use {Multiaddr, Transport}; use {Multiaddr, Transport};
@ -34,8 +34,6 @@ where
transport: TTrans, transport: TTrans,
/// All the active listeners. /// All the active listeners.
listeners: Vec<Listener<TTrans>>, listeners: Vec<Listener<TTrans>>,
/// Task to notify when we add a new listener to `listeners`, so that we start polling.
to_notify: Option<task::Task>,
} }
/// A single active listener. /// A single active listener.
@ -83,7 +81,6 @@ where
ListenersStream { ListenersStream {
transport, transport,
listeners: Vec::new(), listeners: Vec::new(),
to_notify: None,
} }
} }
@ -94,7 +91,6 @@ where
ListenersStream { ListenersStream {
transport, transport,
listeners: Vec::with_capacity(capacity), listeners: Vec::with_capacity(capacity),
to_notify: None,
} }
} }
@ -116,10 +112,6 @@ where
address: new_addr.clone(), address: new_addr.clone(),
}); });
if let Some(task) = self.to_notify.take() {
task.notify();
}
Ok(new_addr) Ok(new_addr)
} }
@ -177,7 +169,6 @@ where
} }
// We register the current task to be waken up if a new listener is added. // We register the current task to be waken up if a new listener is added.
self.to_notify = Some(task::current());
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::{prelude::*, task}; use futures::prelude::*;
use muxing; use muxing;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::fmt; use std::fmt;
@ -67,9 +67,6 @@ where
address: Addr<TAddrFut>, address: Addr<TAddrFut>,
/// List of substreams we are currently opening. /// List of substreams we are currently opening.
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
/// Task to notify when a new element is added to `outbound_substreams`, so that we can start
/// polling it.
to_notify: Option<task::Task>,
} }
/// Address of the node. /// Address of the node.
@ -140,7 +137,6 @@ where
outbound_finished: false, outbound_finished: false,
address: Addr::Future(multiaddr_future), address: Addr::Future(multiaddr_future),
outbound_substreams: SmallVec::new(), outbound_substreams: SmallVec::new(),
to_notify: None,
} }
} }
@ -171,10 +167,6 @@ where
let raw = self.muxer.open_outbound(); let raw = self.muxer.open_outbound();
self.outbound_substreams.push((user_data, raw)); self.outbound_substreams.push((user_data, raw));
if let Some(task) = self.to_notify.take() {
task.notify();
}
Ok(()) Ok(())
} }
@ -287,7 +279,6 @@ where
} }
// Nothing happened. Register our task to be notified and return. // Nothing happened. Register our task to be notified and return.
self.to_notify = Some(task::current());
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }

View File

@ -167,7 +167,6 @@ where TSocket: AsyncRead + AsyncWrite,
sent_pings: VecDeque::with_capacity(4), sent_pings: VecDeque::with_capacity(4),
rng: EntropyRng::default(), rng: EntropyRng::default(),
pings_to_send: VecDeque::with_capacity(4), pings_to_send: VecDeque::with_capacity(4),
to_notify: None,
}; };
PingOutput::Pinger(dialer) PingOutput::Pinger(dialer)
@ -201,8 +200,6 @@ pub struct PingDialer<TSocket, TUserData> {
rng: EntropyRng, rng: EntropyRng,
/// List of pings to send to the remote. /// List of pings to send to the remote.
pings_to_send: VecDeque<(Bytes, TUserData)>, pings_to_send: VecDeque<(Bytes, TUserData)>,
/// Task to notify when we add an element to `pings_to_send`.
to_notify: Option<task::Task>,
} }
impl<TSocket, TUserData> PingDialer<TSocket, TUserData> { impl<TSocket, TUserData> PingDialer<TSocket, TUserData> {
@ -213,9 +210,6 @@ impl<TSocket, TUserData> PingDialer<TSocket, TUserData> {
let payload: [u8; 32] = self.rng.sample(Standard); let payload: [u8; 32] = self.rng.sample(Standard);
debug!("Preparing for ping with payload {:?}", payload); debug!("Preparing for ping with payload {:?}", payload);
self.pings_to_send.push_back((Bytes::from(payload.to_vec()), user_data)); self.pings_to_send.push_back((Bytes::from(payload.to_vec()), user_data));
if let Some(to_notify) = self.to_notify.take() {
to_notify.notify();
}
} }
} }
@ -234,8 +228,6 @@ where TSocket: AsyncRead + AsyncWrite,
} }
} }
self.to_notify = Some(task::current());
while let Some((ping, user_data)) = self.pings_to_send.pop_front() { while let Some((ping, user_data)) = self.pings_to_send.pop_front() {
match self.inner.start_send(ping.clone()) { match self.inner.start_send(ping.clone()) {
Ok(AsyncSink::Ready) => self.need_writer_flush = true, Ok(AsyncSink::Ready) => self.need_writer_flush = true,