Don't allow handlers::poll() to return None (#811)

This commit is contained in:
Pierre Krieger 2019-01-02 14:22:23 +01:00 committed by GitHub
parent f903e2b744
commit 2c2fc8bfd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 97 additions and 82 deletions

View File

@ -68,7 +68,7 @@ pub trait NodeHandler {
fn inject_event(&mut self, event: Self::InEvent); fn inject_event(&mut self, event: Self::InEvent);
/// Indicates to the node that it should shut down. After that, it is expected that `poll()` /// Indicates to the node that it should shut down. After that, it is expected that `poll()`
/// returns `Ready(None)` as soon as possible. /// returns `Ready(NodeHandlerEvent::Shutdown)` as soon as possible.
/// ///
/// This method allows an implementation to perform a graceful shutdown of the substreams, and /// This method allows an implementation to perform a graceful shutdown of the substreams, and
/// send back various events. /// send back various events.
@ -76,7 +76,7 @@ pub trait NodeHandler {
/// Should behave like `Stream::poll()`. Should close if no more event can be produced and the /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the
/// node should be closed. /// node should be closed.
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error>; fn poll(&mut self) -> Poll<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>;
} }
/// Endpoint for a received substream. /// Endpoint for a received substream.
@ -112,6 +112,9 @@ pub enum NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote. /// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo), OutboundSubstreamRequest(TOutboundOpenInfo),
/// Gracefully shut down the connection to the node.
Shutdown,
/// Other event. /// Other event.
Custom(TCustom), Custom(TCustom),
} }
@ -127,6 +130,7 @@ impl<TOutboundOpenInfo, TCustom> NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
NodeHandlerEvent::OutboundSubstreamRequest(val) => { NodeHandlerEvent::OutboundSubstreamRequest(val) => {
NodeHandlerEvent::OutboundSubstreamRequest(map(val)) NodeHandlerEvent::OutboundSubstreamRequest(map(val))
}, },
NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown,
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val), NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val),
} }
} }
@ -140,6 +144,7 @@ impl<TOutboundOpenInfo, TCustom> NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
NodeHandlerEvent::OutboundSubstreamRequest(val) => { NodeHandlerEvent::OutboundSubstreamRequest(val) => {
NodeHandlerEvent::OutboundSubstreamRequest(val) NodeHandlerEvent::OutboundSubstreamRequest(val)
}, },
NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown,
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)), NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)),
} }
} }
@ -283,13 +288,13 @@ where
} }
} }
match if self.handler_is_done { Async::Ready(None) } else { self.handler.poll().map_err(HandledNodeError::Handler)? } { match if self.handler_is_done { Async::Ready(NodeHandlerEvent::Shutdown) } else { self.handler.poll().map_err(HandledNodeError::Handler)? } {
Async::NotReady => { Async::NotReady => {
if node_not_ready { if node_not_ready {
break break
} }
} }
Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(user_data))) => { Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(user_data)) => {
if self.node.get_ref().is_outbound_open() { if self.node.get_ref().is_outbound_open() {
match self.node.get_mut().open_substream(user_data) { match self.node.get_mut().open_substream(user_data) {
Ok(()) => (), Ok(()) => (),
@ -301,10 +306,10 @@ where
self.handler.inject_outbound_closed(user_data); self.handler.inject_outbound_closed(user_data);
} }
} }
Async::Ready(Some(NodeHandlerEvent::Custom(event))) => { Async::Ready(NodeHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(Some(event))); return Ok(Async::Ready(Some(event)));
} }
Async::Ready(None) => { Async::Ready(NodeHandlerEvent::Shutdown) => {
self.handler_is_done = true; self.handler_is_done = true;
if !self.is_shutting_down { if !self.is_shutting_down {
self.is_shutting_down = true; self.is_shutting_down = true;
@ -440,12 +445,12 @@ mod tests {
assert!(self.substream_attempt_cancelled); assert!(self.substream_attempt_cancelled);
self.shutdown_called = true; self.shutdown_called = true;
} }
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<(), ()>>, io::Error> { fn poll(&mut self) -> Poll<NodeHandlerEvent<(), ()>, io::Error> {
if self.shutdown_called { if self.shutdown_called {
Ok(Async::Ready(None)) Ok(Async::Ready(NodeHandlerEvent::Shutdown))
} else if !self.did_substream_attempt { } else if !self.did_substream_attempt {
self.did_substream_attempt = true; self.did_substream_attempt = true;
Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(())))) Ok(Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(())))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }

View File

@ -97,11 +97,11 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Void, Void,
> { > {
if self.shutting_down { if self.shutting_down {
Ok(Async::Ready(None)) Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }

View File

@ -108,7 +108,7 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error, Self::Error,
> { > {
self.inner.poll() self.inner.poll()

View File

@ -103,16 +103,17 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error, Self::Error,
> { > {
Ok(self.inner.poll()?.map(|ev| { Ok(self.inner.poll()?.map(|ev| {
ev.map(|ev| match ev { match ev {
ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)), ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)),
ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown,
ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => {
ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }
} }
}) }
})) }))
} }
} }

View File

@ -62,7 +62,8 @@ mod select;
/// ///
/// Implementors of this trait should keep in mind that the connection can be closed at any time. /// Implementors of this trait should keep in mind that the connection can be closed at any time.
/// When a connection is closed (either by us or by the remote) `shutdown()` is called and the /// When a connection is closed (either by us or by the remote) `shutdown()` is called and the
/// handler continues to be processed until it produces `None`. Only then the handler is destroyed. /// handler continues to be processed until it produces `ProtocolsHandlerEvent::Shutdown`. Only
/// then the handler is destroyed.
/// ///
/// This makes it possible for the handler to finish delivering events even after knowing that it /// This makes it possible for the handler to finish delivering events even after knowing that it
/// is shutting down. /// is shutting down.
@ -146,7 +147,7 @@ pub trait ProtocolsHandler {
/// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns
/// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to /// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to
/// > `shutdown()` and will eventually be closed and destroyed. /// > `shutdown()` and will eventually be closed and destroyed.
fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error>; fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>;
/// Adds a closure that turns the input event into something else. /// Adds a closure that turns the input event into something else.
#[inline] #[inline]
@ -211,6 +212,11 @@ pub enum ProtocolsHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
info: TOutboundOpenInfo, info: TOutboundOpenInfo,
}, },
/// Perform a graceful shutdown of the connection to the remote.
///
/// Should be returned after `shutdown()` has been called.
Shutdown,
/// Other event. /// Other event.
Custom(TCustom), Custom(TCustom),
} }
@ -235,6 +241,7 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
info: map(info), info: map(info),
} }
} }
ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown,
ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val),
} }
} }
@ -255,6 +262,7 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
info, info,
} }
} }
ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown,
ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val),
} }
} }
@ -272,6 +280,7 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => {
ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }
} }
ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown,
ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)), ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)),
} }
} }

View File

@ -198,7 +198,7 @@ where
self.handler.shutdown(); self.handler.shutdown();
} }
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error> { fn poll(&mut self) -> Poll<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
// Continue negotiation of newly-opened substreams on the listening side. // Continue negotiation of newly-opened substreams on the listening side.
// We remove each element from `negotiating_in` one by one and add them back if not ready. // We remove each element from `negotiating_in` one by one and add them back if not ready.
for n in (0..self.negotiating_in.len()).rev() { for n in (0..self.negotiating_in.len()).rev() {
@ -244,21 +244,23 @@ where
// Poll the handler at the end so that we see the consequences of the method calls on // Poll the handler at the end so that we see the consequences of the method calls on
// `self.handler`. // `self.handler`.
match self.handler.poll()? { match self.handler.poll()? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))); return Ok(Async::Ready(NodeHandlerEvent::Custom(event)));
} }
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade, upgrade,
info, info,
})) => { }) => {
let id = self.unique_dial_upgrade_id; let id = self.unique_dial_upgrade_id;
self.unique_dial_upgrade_id += 1; self.unique_dial_upgrade_id += 1;
self.queued_dial_upgrades.push((id, upgrade)); self.queued_dial_upgrades.push((id, upgrade));
return Ok(Async::Ready(Some( return Ok(Async::Ready(
NodeHandlerEvent::OutboundSubstreamRequest((id, info)), NodeHandlerEvent::OutboundSubstreamRequest((id, info)),
))); ));
} }
Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
return Ok(Async::Ready(NodeHandlerEvent::Shutdown))
},
Async::NotReady => (), Async::NotReady => (),
}; };

View File

@ -161,32 +161,36 @@ where
self.proto2.shutdown(); self.proto2.shutdown();
} }
fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error> { fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
match self.proto1.poll().map_err(EitherError::A)? { match self.proto1.poll().map_err(EitherError::A)? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))))); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))));
}, },
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info})) => { Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info}) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: EitherUpgrade::A(upgrade), upgrade: EitherUpgrade::A(upgrade),
info: EitherOutput::First(info), info: EitherOutput::First(info),
}))); }));
},
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
}, },
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => () Async::NotReady => ()
}; };
match self.proto2.poll().map_err(EitherError::B)? { match self.proto2.poll().map_err(EitherError::B)? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))))); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))));
}, },
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info })) => { Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: EitherUpgrade::B(upgrade), upgrade: EitherUpgrade::B(upgrade),
info: EitherOutput::Second(info), info: EitherOutput::Second(info),
}))); }));
},
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
}, },
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => () Async::NotReady => ()
}; };

View File

@ -130,12 +130,12 @@ impl NodeHandler for Handler {
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.state = Some(HandlerState::Ready(None)); self.state = Some(HandlerState::Ready(None));
} }
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<usize, OutEvent>>, IoError> { fn poll(&mut self) -> Poll<NodeHandlerEvent<usize, OutEvent>, IoError> {
match self.state.take() { match self.state.take() {
Some(ref state) => match state { Some(ref state) => match state {
HandlerState::NotReady => Ok(Async::NotReady), HandlerState::NotReady => Ok(Async::NotReady),
HandlerState::Ready(None) => Ok(Async::Ready(None)), HandlerState::Ready(None) => Ok(Async::Ready(NodeHandlerEvent::Shutdown)),
HandlerState::Ready(Some(event)) => Ok(Async::Ready(Some(event.clone()))), HandlerState::Ready(Some(event)) => Ok(Async::Ready(event.clone())),
HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")), HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")),
}, },
None => Ok(Async::NotReady), None => Ok(Async::NotReady),

View File

@ -160,17 +160,17 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
io::Error, io::Error,
> { > {
if !self.send_queue.is_empty() { if !self.send_queue.is_empty() {
let message = self.send_queue.remove(0); let message = self.send_queue.remove(0);
return Ok(Async::Ready(Some( return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest { ProtocolsHandlerEvent::OutboundSubstreamRequest {
info: message, info: message,
upgrade: self.config.clone(), upgrade: self.config.clone(),
}, },
))); ));
} }
for n in (0..self.substreams.len()).rev() { for n in (0..self.substreams.len()).rev() {
@ -181,7 +181,7 @@ where
Ok(Async::Ready(Some(message))) => { Ok(Async::Ready(Some(message))) => {
self.substreams self.substreams
.push(SubstreamState::WaitingInput(substream)); .push(SubstreamState::WaitingInput(substream));
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(message)))); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(message)));
} }
Ok(Async::Ready(None)) => SubstreamState::Closing(substream), Ok(Async::Ready(None)) => SubstreamState::Closing(substream),
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
@ -217,7 +217,7 @@ where
self.substreams.push(SubstreamState::Closing(substream)); self.substreams.push(SubstreamState::Closing(substream));
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
Err(_) => return Ok(Async::Ready(None)), Err(_) => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)),
}, },
} }
} }

View File

@ -97,23 +97,21 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<
ProtocolsHandlerEvent< ProtocolsHandlerEvent<
Self::OutboundProtocol, Self::OutboundProtocol,
Self::OutboundOpenInfo, Self::OutboundOpenInfo,
Self::OutEvent, Self::OutEvent,
>, >,
>,
Self::Error, Self::Error,
> { > {
if !self.pending_result.is_empty() { if !self.pending_result.is_empty() {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
self.pending_result.remove(0), self.pending_result.remove(0),
)))); )));
} }
if self.shutdown { if self.shutdown {
Ok(Async::Ready(None)) Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }

View File

@ -126,24 +126,22 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<
ProtocolsHandlerEvent< ProtocolsHandlerEvent<
Self::OutboundProtocol, Self::OutboundProtocol,
Self::OutboundOpenInfo, Self::OutboundOpenInfo,
PeriodicIdHandlerEvent, PeriodicIdHandlerEvent,
>, >,
>,
Self::Error, Self::Error,
> { > {
if let Some(pending_result) = self.pending_result.take() { if let Some(pending_result) = self.pending_result.take() {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
pending_result, pending_result,
)))); )));
} }
let next_id = match self.next_id { let next_id = match self.next_id {
Some(ref mut nid) => nid, Some(ref mut nid) => nid,
None => return Ok(Async::Ready(None)), None => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)),
}; };
// Poll the future that fires when we need to identify the node again. // Poll the future that fires when we need to identify the node again.
@ -153,7 +151,7 @@ where
next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); next_id.reset(Instant::now() + DELAY_TO_NEXT_ID);
let upgrade = self.config.clone(); let upgrade = self.config.clone();
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () };
Ok(Async::Ready(Some(ev))) Ok(Async::Ready(ev))
} }
} }
} }

View File

@ -497,9 +497,7 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
>,
io::Error, io::Error,
> { > {
// Special case if shutting down. // Special case if shutting down.
@ -512,7 +510,7 @@ where
} }
if self.substreams.is_empty() { if self.substreams.is_empty() {
return Ok(Async::Ready(None)); return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown));
} else { } else {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
@ -526,10 +524,10 @@ where
match advance_substream(substream, self.config) { match advance_substream(substream, self.config) {
(Some(new_state), Some(event), _) => { (Some(new_state), Some(event), _) => {
self.substreams.push(new_state); self.substreams.push(new_state);
return Ok(Async::Ready(Some(event))); return Ok(Async::Ready(event));
} }
(None, Some(event), _) => { (None, Some(event), _) => {
return Ok(Async::Ready(Some(event))); return Ok(Async::Ready(event));
} }
(Some(new_state), None, false) => { (Some(new_state), None, false) => {
self.substreams.push(new_state); self.substreams.push(new_state);

View File

@ -214,7 +214,7 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
io::Error, io::Error,
> { > {
// Shortcut for polling a `tokio_timer::Delay` // Shortcut for polling a `tokio_timer::Delay`
@ -234,7 +234,7 @@ where
match mem::replace(&mut self.out_state, OutState::Poisoned) { match mem::replace(&mut self.out_state, OutState::Poisoned) {
OutState::Shutdown | OutState::Poisoned => { OutState::Shutdown | OutState::Poisoned => {
// This shuts down the whole connection with the remote. // This shuts down the whole connection with the remote.
Ok(Async::Ready(None)) Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
}, },
OutState::Disabled => { OutState::Disabled => {
@ -246,12 +246,12 @@ where
// Note that we ignore the expiration here, as it's pretty unlikely to happen. // Note that we ignore the expiration here, as it's pretty unlikely to happen.
// The expiration is only here to be transmitted to the `Upgrading`. // The expiration is only here to be transmitted to the `Upgrading`.
self.out_state = OutState::Upgrading { expires }; self.out_state = OutState::Upgrading { expires };
Ok(Async::Ready(Some( Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest { ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.ping_config, upgrade: self.ping_config,
info: (), info: (),
}, },
))) ))
} }
// Waiting for the upgrade to be negotiated. // Waiting for the upgrade to be negotiated.
@ -263,7 +263,7 @@ where
Ready => { Ready => {
self.out_state = OutState::Shutdown; self.out_state = OutState::Shutdown;
let ev = OutEvent::Unresponsive; let ev = OutEvent::Unresponsive;
Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))) Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev)))
}, },
}), }),
@ -278,12 +278,12 @@ where
next_ping: Delay::new(Instant::now() + self.delay_to_next_ping), next_ping: Delay::new(Instant::now() + self.delay_to_next_ping),
}; };
let ev = OutEvent::PingSuccess(started.elapsed()); let ev = OutEvent::PingSuccess(started.elapsed());
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))); return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev)));
} }
Async::NotReady => {} Async::NotReady => {}
Async::Ready(None) => { Async::Ready(None) => {
self.out_state = OutState::Shutdown; self.out_state = OutState::Shutdown;
return Ok(Async::Ready(None)); return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown));
} }
} }
@ -298,7 +298,7 @@ where
Ready => { Ready => {
self.out_state = OutState::Shutdown; self.out_state = OutState::Shutdown;
let ev = OutEvent::Unresponsive; let ev = OutEvent::Unresponsive;
Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))) Ok(Async::Ready(ProtocolsHandlerEvent::Custom(ev)))
}, },
}) })
} }
@ -314,7 +314,7 @@ where
let expires = Delay::new(Instant::now() + self.ping_timeout); let expires = Delay::new(Instant::now() + self.ping_timeout);
substream.ping(Instant::now()); substream.ping(Instant::now());
self.out_state = OutState::WaitingForPong { substream, expires }; self.out_state = OutState::WaitingForPong { substream, expires };
Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(OutEvent::PingStart)))) Ok(Async::Ready(ProtocolsHandlerEvent::Custom(OutEvent::PingStart)))
}, },
}) })
} }

View File

@ -117,7 +117,7 @@ where
fn poll( fn poll(
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error, Self::Error,
> { > {
// Removes each substream one by one, and pushes them back if they're not ready (which // Removes each substream one by one, and pushes them back if they're not ready (which
@ -133,7 +133,7 @@ where
// Special case if shutting down. // Special case if shutting down.
if self.shutdown && self.ping_in_substreams.is_empty() { if self.shutdown && self.ping_in_substreams.is_empty() {
return Ok(Async::Ready(None)); return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown));
} }
Ok(Async::NotReady) Ok(Async::NotReady)