diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index 17220ecb..643a301a 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -68,7 +68,7 @@ pub trait NodeHandler { fn inject_event(&mut self, event: Self::InEvent); /// Indicates to the node that it should shut down. After that, it is expected that `poll()` - /// returns `Ready(None)` as soon as possible. + /// returns `Ready(NodeHandlerEvent::Shutdown)` as soon as possible. /// /// This method allows an implementation to perform a graceful shutdown of the substreams, and /// send back various events. @@ -76,7 +76,7 @@ pub trait NodeHandler { /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the /// node should be closed. - fn poll(&mut self) -> Poll>, Self::Error>; + fn poll(&mut self) -> Poll, Self::Error>; } /// Endpoint for a received substream. @@ -112,6 +112,9 @@ pub enum NodeHandlerEvent { /// Require a new outbound substream to be opened with the remote. OutboundSubstreamRequest(TOutboundOpenInfo), + /// Gracefully shut down the connection to the node. + Shutdown, + /// Other event. Custom(TCustom), } @@ -127,6 +130,7 @@ impl NodeHandlerEvent { NodeHandlerEvent::OutboundSubstreamRequest(val) => { NodeHandlerEvent::OutboundSubstreamRequest(map(val)) }, + NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown, NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val), } } @@ -140,6 +144,7 @@ impl NodeHandlerEvent { NodeHandlerEvent::OutboundSubstreamRequest(val) => { NodeHandlerEvent::OutboundSubstreamRequest(val) }, + NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown, NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)), } } @@ -283,13 +288,13 @@ where } } - match if self.handler_is_done { Async::Ready(None) } else { self.handler.poll().map_err(HandledNodeError::Handler)? } { + match if self.handler_is_done { Async::Ready(NodeHandlerEvent::Shutdown) } else { self.handler.poll().map_err(HandledNodeError::Handler)? } { Async::NotReady => { if node_not_ready { break } } - Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(user_data))) => { + Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(user_data)) => { if self.node.get_ref().is_outbound_open() { match self.node.get_mut().open_substream(user_data) { Ok(()) => (), @@ -301,10 +306,10 @@ where self.handler.inject_outbound_closed(user_data); } } - Async::Ready(Some(NodeHandlerEvent::Custom(event))) => { + Async::Ready(NodeHandlerEvent::Custom(event)) => { return Ok(Async::Ready(Some(event))); } - Async::Ready(None) => { + Async::Ready(NodeHandlerEvent::Shutdown) => { self.handler_is_done = true; if !self.is_shutting_down { self.is_shutting_down = true; @@ -440,12 +445,12 @@ mod tests { assert!(self.substream_attempt_cancelled); self.shutdown_called = true; } - fn poll(&mut self) -> Poll>, io::Error> { + fn poll(&mut self) -> Poll, io::Error> { if self.shutdown_called { - Ok(Async::Ready(None)) + Ok(Async::Ready(NodeHandlerEvent::Shutdown)) } else if !self.did_substream_attempt { self.did_substream_attempt = true; - Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(())))) + Ok(Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(()))) } else { Ok(Async::NotReady) } diff --git a/core/src/protocols_handler/dummy.rs b/core/src/protocols_handler/dummy.rs index f400fddc..ee8dafb4 100644 --- a/core/src/protocols_handler/dummy.rs +++ b/core/src/protocols_handler/dummy.rs @@ -97,11 +97,11 @@ where fn poll( &mut self, ) -> Poll< - Option>, + ProtocolsHandlerEvent, Void, > { if self.shutting_down { - Ok(Async::Ready(None)) + Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) } else { Ok(Async::NotReady) } diff --git a/core/src/protocols_handler/map_in.rs b/core/src/protocols_handler/map_in.rs index 3a4861a8..ae62d9fe 100644 --- a/core/src/protocols_handler/map_in.rs +++ b/core/src/protocols_handler/map_in.rs @@ -108,7 +108,7 @@ where fn poll( &mut self, ) -> Poll< - Option>, + ProtocolsHandlerEvent, Self::Error, > { self.inner.poll() diff --git a/core/src/protocols_handler/map_out.rs b/core/src/protocols_handler/map_out.rs index 98103fc2..defa03a0 100644 --- a/core/src/protocols_handler/map_out.rs +++ b/core/src/protocols_handler/map_out.rs @@ -103,16 +103,17 @@ where fn poll( &mut self, ) -> Poll< - Option>, + ProtocolsHandlerEvent, Self::Error, > { Ok(self.inner.poll()?.map(|ev| { - ev.map(|ev| match ev { + match ev { ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)), + ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } } - }) + } })) } } diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 2889141d..6127b68a 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -62,7 +62,8 @@ mod select; /// /// Implementors of this trait should keep in mind that the connection can be closed at any time. /// When a connection is closed (either by us or by the remote) `shutdown()` is called and the -/// handler continues to be processed until it produces `None`. Only then the handler is destroyed. +/// handler continues to be processed until it produces `ProtocolsHandlerEvent::Shutdown`. Only +/// then the handler is destroyed. /// /// This makes it possible for the handler to finish delivering events even after knowing that it /// is shutting down. @@ -146,7 +147,7 @@ pub trait ProtocolsHandler { /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns /// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to /// > `shutdown()` and will eventually be closed and destroyed. - fn poll(&mut self) -> Poll>, Self::Error>; + fn poll(&mut self) -> Poll, Self::Error>; /// Adds a closure that turns the input event into something else. #[inline] @@ -211,6 +212,11 @@ pub enum ProtocolsHandlerEvent { info: TOutboundOpenInfo, }, + /// Perform a graceful shutdown of the connection to the remote. + /// + /// Should be returned after `shutdown()` has been called. + Shutdown, + /// Other event. Custom(TCustom), } @@ -235,6 +241,7 @@ impl info: map(info), } } + ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), } } @@ -255,6 +262,7 @@ impl info, } } + ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), } } @@ -272,6 +280,7 @@ impl ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } } + ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)), } } diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index 8f709090..ad321b81 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -198,7 +198,7 @@ where self.handler.shutdown(); } - fn poll(&mut self) -> Poll>, Self::Error> { + fn poll(&mut self) -> Poll, Self::Error> { // Continue negotiation of newly-opened substreams on the listening side. // We remove each element from `negotiating_in` one by one and add them back if not ready. for n in (0..self.negotiating_in.len()).rev() { @@ -244,21 +244,23 @@ where // Poll the handler at the end so that we see the consequences of the method calls on // `self.handler`. match self.handler.poll()? { - Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { - return Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))); + Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); } - Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info, - })) => { + }) => { let id = self.unique_dial_upgrade_id; self.unique_dial_upgrade_id += 1; self.queued_dial_upgrades.push((id, upgrade)); - return Ok(Async::Ready(Some( + return Ok(Async::Ready( NodeHandlerEvent::OutboundSubstreamRequest((id, info)), - ))); + )); } - Async::Ready(None) => return Ok(Async::Ready(None)), + Async::Ready(ProtocolsHandlerEvent::Shutdown) => { + return Ok(Async::Ready(NodeHandlerEvent::Shutdown)) + }, Async::NotReady => (), }; diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index 86a27f8d..d010771b 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -161,32 +161,36 @@ where self.proto2.shutdown(); } - fn poll(&mut self) -> Poll>, Self::Error> { + fn poll(&mut self) -> Poll, Self::Error> { match self.proto1.poll().map_err(EitherError::A)? { - Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))))); + Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event)))); }, - Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info})) => { - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info}) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade: EitherUpgrade::A(upgrade), info: EitherOutput::First(info), - }))); + })); + }, + Async::Ready(ProtocolsHandlerEvent::Shutdown) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) }, - Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => () }; match self.proto2.poll().map_err(EitherError::B)? { - Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))))); + Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event)))); }, - Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info })) => { - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade: EitherUpgrade::B(upgrade), info: EitherOutput::Second(info), - }))); + })); + }, + Async::Ready(ProtocolsHandlerEvent::Shutdown) => { + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) }, - Async::Ready(None) => return Ok(Async::Ready(None)), Async::NotReady => () }; diff --git a/core/src/tests/dummy_handler.rs b/core/src/tests/dummy_handler.rs index d63d6edc..255df676 100644 --- a/core/src/tests/dummy_handler.rs +++ b/core/src/tests/dummy_handler.rs @@ -130,12 +130,12 @@ impl NodeHandler for Handler { fn shutdown(&mut self) { self.state = Some(HandlerState::Ready(None)); } - fn poll(&mut self) -> Poll>, IoError> { + fn poll(&mut self) -> Poll, IoError> { match self.state.take() { Some(ref state) => match state { HandlerState::NotReady => Ok(Async::NotReady), - HandlerState::Ready(None) => Ok(Async::Ready(None)), - HandlerState::Ready(Some(event)) => Ok(Async::Ready(Some(event.clone()))), + HandlerState::Ready(None) => Ok(Async::Ready(NodeHandlerEvent::Shutdown)), + HandlerState::Ready(Some(event)) => Ok(Async::Ready(event.clone())), HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")), }, None => Ok(Async::NotReady), diff --git a/protocols/floodsub/src/handler.rs b/protocols/floodsub/src/handler.rs index c084b8cf..f8a1fac4 100644 --- a/protocols/floodsub/src/handler.rs +++ b/protocols/floodsub/src/handler.rs @@ -160,17 +160,17 @@ where fn poll( &mut self, ) -> Poll< - Option>, + ProtocolsHandlerEvent, io::Error, > { if !self.send_queue.is_empty() { let message = self.send_queue.remove(0); - return Ok(Async::Ready(Some( + return Ok(Async::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { info: message, upgrade: self.config.clone(), }, - ))); + )); } for n in (0..self.substreams.len()).rev() { @@ -181,7 +181,7 @@ where Ok(Async::Ready(Some(message))) => { self.substreams .push(SubstreamState::WaitingInput(substream)); - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(message)))); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(message))); } Ok(Async::Ready(None)) => SubstreamState::Closing(substream), Ok(Async::NotReady) => { @@ -217,7 +217,7 @@ where self.substreams.push(SubstreamState::Closing(substream)); return Ok(Async::NotReady); } - Err(_) => return Ok(Async::Ready(None)), + Err(_) => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)), }, } } diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index 650b4780..c05a3926 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -97,23 +97,21 @@ where fn poll( &mut self, ) -> Poll< - Option< - ProtocolsHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - >, + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, >, Self::Error, > { if !self.pending_result.is_empty() { - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( self.pending_result.remove(0), - )))); + ))); } if self.shutdown { - Ok(Async::Ready(None)) + Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) } else { Ok(Async::NotReady) } diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index ef0dd500..2cb1bbf1 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -126,24 +126,22 @@ where fn poll( &mut self, ) -> Poll< - Option< - ProtocolsHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - PeriodicIdHandlerEvent, - >, + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + PeriodicIdHandlerEvent, >, Self::Error, > { if let Some(pending_result) = self.pending_result.take() { - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( pending_result, - )))); + ))); } let next_id = match self.next_id { Some(ref mut nid) => nid, - None => return Ok(Async::Ready(None)), + None => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)), }; // Poll the future that fires when we need to identify the node again. @@ -153,7 +151,7 @@ where next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); let upgrade = self.config.clone(); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; - Ok(Async::Ready(Some(ev))) + Ok(Async::Ready(ev)) } } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 20c78a83..7ef85081 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -497,9 +497,7 @@ where fn poll( &mut self, ) -> Poll< - Option< - ProtocolsHandlerEvent, - >, + ProtocolsHandlerEvent, io::Error, > { // Special case if shutting down. @@ -512,7 +510,7 @@ where } if self.substreams.is_empty() { - return Ok(Async::Ready(None)); + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)); } else { return Ok(Async::NotReady); } @@ -526,10 +524,10 @@ where match advance_substream(substream, self.config) { (Some(new_state), Some(event), _) => { self.substreams.push(new_state); - return Ok(Async::Ready(Some(event))); + return Ok(Async::Ready(event)); } (None, Some(event), _) => { - return Ok(Async::Ready(Some(event))); + return Ok(Async::Ready(event)); } (Some(new_state), None, false) => { self.substreams.push(new_state); diff --git a/protocols/ping/src/dial_handler.rs b/protocols/ping/src/dial_handler.rs index abd7bfbe..dbe1375f 100644 --- a/protocols/ping/src/dial_handler.rs +++ b/protocols/ping/src/dial_handler.rs @@ -214,7 +214,7 @@ where fn poll( &mut self, ) -> Poll< - Option>, + ProtocolsHandlerEvent, io::Error, > { // Shortcut for polling a `tokio_timer::Delay` @@ -234,7 +234,7 @@ where match mem::replace(&mut self.out_state, OutState::Poisoned) { OutState::Shutdown | OutState::Poisoned => { // This shuts down the whole connection with the remote. - Ok(Async::Ready(None)) + Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) }, OutState::Disabled => { @@ -246,12 +246,12 @@ where // Note that we ignore the expiration here, as it's pretty unlikely to happen. // The expiration is only here to be transmitted to the `Upgrading`. self.out_state = OutState::Upgrading { expires }; - Ok(Async::Ready(Some( + Ok(Async::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade: self.ping_config, info: (), }, - ))) + )) } // Waiting for the upgrade to be negotiated. @@ -263,7 +263,7 @@ where Ready => { self.out_state = OutState::Shutdown; let ev = OutEvent::Unresponsive; - Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))) + Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev))) }, }), @@ -278,12 +278,12 @@ where next_ping: Delay::new(Instant::now() + self.delay_to_next_ping), }; let ev = OutEvent::PingSuccess(started.elapsed()); - return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))); + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev))); } Async::NotReady => {} Async::Ready(None) => { self.out_state = OutState::Shutdown; - return Ok(Async::Ready(None)); + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)); } } @@ -298,7 +298,7 @@ where Ready => { self.out_state = OutState::Shutdown; let ev = OutEvent::Unresponsive; - Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))) + Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev))) }, }) } @@ -314,7 +314,7 @@ where let expires = Delay::new(Instant::now() + self.ping_timeout); substream.ping(Instant::now()); self.out_state = OutState::WaitingForPong { substream, expires }; - Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(OutEvent::PingStart)))) + Ok(Async::Ready(ProtocolsHandlerEvent::Custom(OutEvent::PingStart))) }, }) } diff --git a/protocols/ping/src/listen_handler.rs b/protocols/ping/src/listen_handler.rs index 43c318a1..47fb26d1 100644 --- a/protocols/ping/src/listen_handler.rs +++ b/protocols/ping/src/listen_handler.rs @@ -117,7 +117,7 @@ where fn poll( &mut self, ) -> Poll< - Option>, + ProtocolsHandlerEvent, Self::Error, > { // Removes each substream one by one, and pushes them back if they're not ready (which @@ -133,7 +133,7 @@ where // Special case if shutting down. if self.shutdown && self.ping_in_substreams.is_empty() { - return Ok(Async::Ready(None)); + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)); } Ok(Async::NotReady)