mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 10:31:21 +00:00
Fix the polling process in handled node (#582)
This commit is contained in:
@ -143,6 +143,8 @@ where
|
|||||||
node: Fuse<NodeStream<TMuxer, THandler::OutboundOpenInfo>>,
|
node: Fuse<NodeStream<TMuxer, THandler::OutboundOpenInfo>>,
|
||||||
/// Handler that processes substreams.
|
/// Handler that processes substreams.
|
||||||
handler: THandler,
|
handler: THandler,
|
||||||
|
/// If true, `handler` has returned `Ready(None)` and therefore shouldn't be polled again.
|
||||||
|
handler_is_done: bool,
|
||||||
// True, if the node is shutting down.
|
// True, if the node is shutting down.
|
||||||
is_shutting_down: bool
|
is_shutting_down: bool
|
||||||
}
|
}
|
||||||
@ -158,6 +160,7 @@ where
|
|||||||
HandledNode {
|
HandledNode {
|
||||||
node: NodeStream::new(muxer).fuse(),
|
node: NodeStream::new(muxer).fuse(),
|
||||||
handler,
|
handler,
|
||||||
|
handler_is_done: false,
|
||||||
is_shutting_down: false
|
is_shutting_down: false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -196,13 +199,11 @@ where
|
|||||||
/// After this method returns, `is_shutting_down()` should return true.
|
/// After this method returns, `is_shutting_down()` should return true.
|
||||||
pub fn shutdown(&mut self) {
|
pub fn shutdown(&mut self) {
|
||||||
self.node.get_mut().shutdown_all();
|
self.node.get_mut().shutdown_all();
|
||||||
self.is_shutting_down = true;
|
|
||||||
|
|
||||||
for user_data in self.node.get_mut().cancel_outgoing() {
|
for user_data in self.node.get_mut().cancel_outgoing() {
|
||||||
self.handler.inject_outbound_closed(user_data);
|
self.handler.inject_outbound_closed(user_data);
|
||||||
}
|
}
|
||||||
|
self.handler.shutdown();
|
||||||
self.handler.shutdown()
|
self.is_shutting_down = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,10 +217,14 @@ where
|
|||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
loop {
|
loop {
|
||||||
|
if self.node.is_done() && self.handler_is_done {
|
||||||
|
return Ok(Async::Ready(None));
|
||||||
|
}
|
||||||
|
|
||||||
let mut node_not_ready = false;
|
let mut node_not_ready = false;
|
||||||
|
|
||||||
match self.node.poll()? {
|
match self.node.poll()? {
|
||||||
Async::NotReady => (),
|
Async::NotReady => node_not_ready = true,
|
||||||
Async::Ready(Some(NodeEvent::InboundSubstream { substream })) => {
|
Async::Ready(Some(NodeEvent::InboundSubstream { substream })) => {
|
||||||
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener)
|
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener)
|
||||||
}
|
}
|
||||||
@ -228,8 +233,8 @@ where
|
|||||||
self.handler.inject_substream(substream, endpoint)
|
self.handler.inject_substream(substream, endpoint)
|
||||||
}
|
}
|
||||||
Async::Ready(None) => {
|
Async::Ready(None) => {
|
||||||
node_not_ready = true;
|
|
||||||
if !self.is_shutting_down {
|
if !self.is_shutting_down {
|
||||||
|
self.is_shutting_down = true;
|
||||||
self.handler.shutdown()
|
self.handler.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -241,7 +246,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.handler.poll()? {
|
match if self.handler_is_done { Async::Ready(None) } else { self.handler.poll()? } {
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
if node_not_ready {
|
if node_not_ready {
|
||||||
break
|
break
|
||||||
@ -261,7 +266,12 @@ where
|
|||||||
return Ok(Async::Ready(Some(event)));
|
return Ok(Async::Ready(Some(event)));
|
||||||
}
|
}
|
||||||
Async::Ready(None) => {
|
Async::Ready(None) => {
|
||||||
return Ok(Async::Ready(None))
|
self.handler_is_done = true;
|
||||||
|
if !self.is_shutting_down {
|
||||||
|
self.is_shutting_down = true;
|
||||||
|
self.node.get_mut().cancel_outgoing();
|
||||||
|
self.node.get_mut().shutdown_all();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user