[multistream-select] Reduce roundtrips in protocol negotiation. (#1212)

* Remove tokio-codec dependency from multistream-select.

In preparation for the eventual switch from tokio to std futures.

Includes some initial refactoring in preparation for further work
in the context of https://github.com/libp2p/rust-libp2p/issues/659.

* Reduce default buffer sizes.

* Allow more than one frame to be buffered for sending.

* Doc tweaks.

* Remove superfluous (duplicated) Message types.

* Reduce roundtrips in multistream-select negotiation.

1. Enable 0-RTT: If the dialer only supports a single protocol, it can send
   protocol data (e.g. the actual application request) together with
   the multistream-select header and protocol proposal. Similarly,
   if the listener supports a proposed protocol, it can send protocol
   data (e.g. the actual application response) together with the
   multistream-select header and protocol confirmation.

2. In general, the dialer "settles on" an expected protocol as soon
   as it runs out of alternatives. Furthermore, both dialer and listener
   do not immediately flush the final protocol confirmation, allowing it
   to be sent together with application protocol data. Attempts to read
   from the negotiated I/O stream implicitly flushes any pending data.

3. A clean / graceful shutdown of an I/O stream always completes protocol
   negotiation.

The publich API of multistream-select changed slightly, requiring both
AsyncRead and AsyncWrite bounds for async reading and writing due to
the implicit buffering and "lazy" negotiation. The error types have
also been changed, but they were not previously fully exported.

Includes some general refactoring with simplifications and some more tests,
e.g. there was an edge case relating to a possible ambiguity when parsing
multistream-select protocol messages.

* Further missing commentary.

* Remove unused test dependency.

* Adjust commentary.

* Cleanup NegotiatedComplete::poll()

* Fix deflate protocol tests.

* Stabilise network_simult test.

The test implicitly relied on "slow" connection establishment
in order to have a sufficient probability of passing.
With the removal of roundtrips in multistream-select, it is now
more likely that within the up to 50ms duration between swarm1
and swarm2 dialing, the connection is already established, causing
the expectation of step == 1 to fail when receiving a Connected event,
since the step may then still be 0.

This commit aims to avoid these spurious errors by detecting runs
during which a connection is established "too quickly", repeating
the test run.

It still seems theoretically possible that, if connections are always
established "too quickly", the test runs forever. However, given that
the delta between swarm1 and swarm2 dialing is 0-50ms and that the
TCP transport is used, that seems probabilistically unlikely.
Nevertheless, the purpose of the artificial dialing delay between
swarm1 and swarm2 should be re-evaluated and possibly at least
the maximum delay further reduced.

* Complete negotiation between upgrades in libp2p-core.

While multistream-select, as a standalone library and providing
an API at the granularity of a single negotiation, supports
lazy negotiation (and in particular 0-RTT negotiation), in the
context of libp2p-core where any number of negotiations are
composed generically within the concept of composable "upgrades",
it is necessary to wait for protocol negotiation between upgrades
to complete.

* Clarify docs. Simplify listener upgrades.

Since reading from a Negotiated I/O stream implicitly flushes any pending
negotiation data, there is no pitfall involved in not waiting for completion.
This commit is contained in:
Roman Borschel
2019-08-12 12:09:53 +02:00
committed by GitHub
parent 5696b3eb4d
commit 589d280bb5
23 changed files with 1631 additions and 1368 deletions

View File

@ -18,31 +18,41 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains the `dialer_select_proto` code, which allows selecting a protocol thanks to
//! `multistream-select` for the dialer.
//! Protocol negotiation strategies for the peer acting as the dialer.
use futures::{future::Either, prelude::*, stream::StreamFuture};
use crate::protocol::{Dialer, DialerFuture, Request, Response};
use log::trace;
use std::mem;
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
use futures::{future::Either, prelude::*};
use log::debug;
use std::{io, iter, mem, convert::TryFrom};
use tokio_io::{AsyncRead, AsyncWrite};
use crate::{Negotiated, ProtocolChoiceError};
use crate::{Negotiated, NegotiationError};
/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
/// either sequentially or by considering all protocols in parallel.
pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPar<R, I>>;
/// Helps selecting a protocol amongst the ones supported.
/// Returns a `Future` that negotiates a protocol on the given I/O stream
/// for a peer acting as the _dialer_ (or _initiator_).
///
/// This function expects a socket and a list of protocols. It uses the `multistream-select`
/// protocol to choose with the remote a protocol amongst the ones produced by the iterator.
/// This function is given an I/O stream and a list of protocols and returns a
/// computation that performs the protocol negotiation with the remote. The
/// returned `Future` resolves with the name of the negotiated protocol and
/// a [`Negotiated`] I/O stream.
///
/// The iterator must produce a tuple of a protocol name advertised to the remote, a function that
/// checks whether a protocol name matches the protocol, and a protocol "identifier" of type `P`
/// (you decide what `P` is). The parameters of the match function are the name proposed by the
/// remote, and the protocol name that we passed (so that you don't have to clone the name). On
/// success, the function returns the identifier (of type `P`), plus the socket which now uses that
/// chosen protocol.
/// The chosen message flow for protocol negotiation depends on the numbers
/// of supported protocols given. That is, this function delegates to
/// [`dialer_select_proto_serial`] or [`dialer_select_proto_parallel`]
/// based on the number of protocols given. The number of protocols is
/// determined through the `size_hint` of the given iterator and thus
/// an inaccurate size estimate may result in a suboptimal choice.
///
/// > **Note**: When multiple `DialerSelectFuture`s are composed, i.e. a
/// > dialer performs multiple, nested protocol negotiations with just a
/// > single supported protocol (0-RTT negotiations), a listener that
/// > does not support one of the intermediate protocols may still process
/// > the request data associated with a supported follow-up protocol.
/// > See \[[1]\]. To avoid this behaviour, a dialer should ensure completion
/// > of the previous negotiation before starting the next negotiation,
/// > which can be accomplished by waiting for the future returned by
/// > [`Negotiated::complete`] to resolve.
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
pub fn dialer_select_proto<R, I>(inner: R, protocols: I) -> DialerSelectFuture<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
@ -58,176 +68,42 @@ where
}
}
/// Helps selecting a protocol amongst the ones supported.
/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
/// either trying protocols in-order, or by requesting all protocols supported
/// by the remote upfront, from which the first protocol found in the dialer's
/// list of protocols is selected.
pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPar<R, I>>;
/// Returns a `Future` that negotiates a protocol on the given I/O stream.
///
/// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce
/// match functions, because it's not needed.
/// Just like [`dialer_select_proto`] but always using an iterative message flow,
/// trying the given list of supported protocols one-by-one.
///
/// This strategy is preferable if the dialer only supports a few protocols.
pub fn dialer_select_proto_serial<R, I>(inner: R, protocols: I) -> DialerSelectSeq<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
I::Item: AsRef<[u8]>
{
let protocols = protocols.into_iter();
let protocols = protocols.into_iter().peekable();
DialerSelectSeq {
inner: DialerSelectSeqState::AwaitDialer {
dialer_fut: Dialer::dial(inner),
protocols
protocols,
state: SeqState::SendHeader {
io: MessageIO::new(inner)
}
}
}
/// Future, returned by `dialer_select_proto_serial` which selects a protocol
/// and dialer sequentially.
pub struct DialerSelectSeq<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]>
{
inner: DialerSelectSeqState<R, I>
}
enum DialerSelectSeqState<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]>
{
AwaitDialer {
dialer_fut: DialerFuture<R, I::Item>,
protocols: I
},
NextProtocol {
dialer: Dialer<R, I::Item>,
proto_name: I::Item,
protocols: I
},
FlushProtocol {
dialer: Dialer<R, I::Item>,
proto_name: I::Item,
protocols: I
},
AwaitProtocol {
stream: StreamFuture<Dialer<R, I::Item>>,
proto_name: I::Item,
protocols: I
},
Undefined
}
impl<R, I> Future for DialerSelectSeq<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]> + Clone
{
type Item = (I::Item, Negotiated<R>);
type Error = ProtocolChoiceError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.inner, DialerSelectSeqState::Undefined) {
DialerSelectSeqState::AwaitDialer { mut dialer_fut, mut protocols } => {
let dialer = match dialer_fut.poll()? {
Async::Ready(d) => d,
Async::NotReady => {
self.inner = DialerSelectSeqState::AwaitDialer { dialer_fut, protocols };
return Ok(Async::NotReady)
}
};
let proto_name = protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound)?;
self.inner = DialerSelectSeqState::NextProtocol {
dialer,
protocols,
proto_name
}
}
DialerSelectSeqState::NextProtocol { mut dialer, protocols, proto_name } => {
trace!("sending {:?}", proto_name.as_ref());
let req = Request::Protocol { name: proto_name.clone() };
match dialer.start_send(req)? {
AsyncSink::Ready => {
self.inner = DialerSelectSeqState::FlushProtocol {
dialer,
proto_name,
protocols
}
}
AsyncSink::NotReady(_) => {
self.inner = DialerSelectSeqState::NextProtocol {
dialer,
protocols,
proto_name
};
return Ok(Async::NotReady)
}
}
}
DialerSelectSeqState::FlushProtocol { mut dialer, proto_name, protocols } => {
match dialer.poll_complete()? {
Async::Ready(()) => {
let stream = dialer.into_future();
self.inner = DialerSelectSeqState::AwaitProtocol {
stream,
proto_name,
protocols
}
}
Async::NotReady => {
self.inner = DialerSelectSeqState::FlushProtocol {
dialer,
proto_name,
protocols
};
return Ok(Async::NotReady)
}
}
}
DialerSelectSeqState::AwaitProtocol { mut stream, proto_name, mut protocols } => {
let (m, r) = match stream.poll() {
Ok(Async::Ready(x)) => x,
Ok(Async::NotReady) => {
self.inner = DialerSelectSeqState::AwaitProtocol {
stream,
proto_name,
protocols
};
return Ok(Async::NotReady)
}
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
};
trace!("received {:?}", m);
match m.ok_or(ProtocolChoiceError::UnexpectedMessage)? {
Response::Protocol { ref name }
if name.as_ref() == proto_name.as_ref() =>
{
return Ok(Async::Ready((proto_name, Negotiated(r.into_inner()))))
}
Response::ProtocolNotAvailable => {
let proto_name = protocols.next()
.ok_or(ProtocolChoiceError::NoProtocolFound)?;
self.inner = DialerSelectSeqState::NextProtocol {
dialer: r,
protocols,
proto_name
}
}
_ => return Err(ProtocolChoiceError::UnexpectedMessage)
}
}
DialerSelectSeqState::Undefined =>
panic!("DialerSelectSeqState::poll called after completion")
}
}
}
}
/// Helps selecting a protocol amongst the ones supported.
/// Returns a `Future` that negotiates a protocol on the given I/O stream.
///
/// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then
/// chooses the most appropriate one.
/// Just like [`dialer_select_proto`] but always using a message flow that first
/// requests all supported protocols from the remote, selecting the first
/// protocol from the given list of supported protocols that is supported
/// by the remote.
///
/// This strategy may be beneficial if the dialer supports many protocols
/// and it is unclear whether the remote supports one of the first few.
pub fn dialer_select_proto_parallel<R, I>(inner: R, protocols: I) -> DialerSelectPar<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
@ -236,193 +112,217 @@ where
{
let protocols = protocols.into_iter();
DialerSelectPar {
inner: DialerSelectParState::AwaitDialer { dialer_fut: Dialer::dial(inner), protocols }
protocols,
state: ParState::SendHeader {
io: MessageIO::new(inner)
}
}
}
/// Future, returned by `dialer_select_proto_parallel`, which selects a protocol and dialer in
/// parallel, by first requesting the list of protocols supported by the remote endpoint and
/// then selecting the most appropriate one by applying a match predicate to the result.
/// A `Future` returned by [`dialer_select_proto_serial`] which negotiates
/// a protocol iteratively by considering one protocol after the other.
pub struct DialerSelectSeq<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]>
{
// TODO: It would be nice if eventually N = I::Item = Protocol.
protocols: iter::Peekable<I>,
state: SeqState<R, I::Item>
}
enum SeqState<R, N>
where
R: AsyncRead + AsyncWrite,
N: AsRef<[u8]>
{
SendHeader { io: MessageIO<R>, },
SendProtocol { io: MessageIO<R>, protocol: N },
FlushProtocol { io: MessageIO<R>, protocol: N },
AwaitProtocol { io: MessageIO<R>, protocol: N },
Done
}
impl<R, I> Future for DialerSelectSeq<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]>
{
type Item = (I::Item, Negotiated<R>);
type Error = NegotiationError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, SeqState::Done) {
SeqState::SendHeader { mut io } => {
if io.start_send(Message::Header(Version::V1))?.is_not_ready() {
self.state = SeqState::SendHeader { io };
return Ok(Async::NotReady)
}
let protocol = self.protocols.next().ok_or(NegotiationError::Failed)?;
self.state = SeqState::SendProtocol { io, protocol };
}
SeqState::SendProtocol { mut io, protocol } => {
let p = Protocol::try_from(protocol.as_ref())?;
if io.start_send(Message::Protocol(p.clone()))?.is_not_ready() {
self.state = SeqState::SendProtocol { io, protocol };
return Ok(Async::NotReady)
}
debug!("Dialer: Proposed protocol: {}", p);
if self.protocols.peek().is_some() {
self.state = SeqState::FlushProtocol { io, protocol }
} else {
debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p);
return Ok(Async::Ready((protocol, io)))
}
}
SeqState::FlushProtocol { mut io, protocol } => {
if io.poll_complete()?.is_not_ready() {
self.state = SeqState::FlushProtocol { io, protocol };
return Ok(Async::NotReady)
}
self.state = SeqState::AwaitProtocol { io, protocol }
}
SeqState::AwaitProtocol { mut io, protocol } => {
let msg = match io.poll()? {
Async::NotReady => {
self.state = SeqState::AwaitProtocol { io, protocol };
return Ok(Async::NotReady)
}
Async::Ready(None) =>
return Err(NegotiationError::from(
io::Error::from(io::ErrorKind::UnexpectedEof))),
Async::Ready(Some(msg)) => msg,
};
match msg {
Message::Header(Version::V1) => {
self.state = SeqState::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
debug!("Dialer: Received confirmation for protocol: {}", p);
let (io, remaining) = io.into_inner();
let io = Negotiated::completed(io, remaining);
return Ok(Async::Ready((protocol, io)))
}
Message::NotAvailable => {
debug!("Dialer: Received rejection of protocol: {}",
String::from_utf8_lossy(protocol.as_ref()));
let protocol = self.protocols.next()
.ok_or(NegotiationError::Failed)?;
self.state = SeqState::SendProtocol { io, protocol }
}
_ => return Err(ProtocolError::InvalidMessage.into())
}
}
SeqState::Done => panic!("SeqState::poll called after completion")
}
}
}
}
/// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates
/// a protocol selectively by considering all supported protocols of the remote
/// "in parallel".
pub struct DialerSelectPar<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]>
{
inner: DialerSelectParState<R, I>
protocols: I,
state: ParState<R, I::Item>
}
enum DialerSelectParState<R, I>
enum ParState<R, N>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]>
N: AsRef<[u8]>
{
AwaitDialer {
dialer_fut: DialerFuture<R, I::Item>,
protocols: I
},
ProtocolList {
dialer: Dialer<R, I::Item>,
protocols: I
},
FlushListRequest {
dialer: Dialer<R, I::Item>,
protocols: I
},
AwaitListResponse {
stream: StreamFuture<Dialer<R, I::Item>>,
protocols: I,
},
Protocol {
dialer: Dialer<R, I::Item>,
proto_name: I::Item
},
FlushProtocol {
dialer: Dialer<R, I::Item>,
proto_name: I::Item
},
AwaitProtocol {
stream: StreamFuture<Dialer<R, I::Item>>,
proto_name: I::Item
},
Undefined
SendHeader { io: MessageIO<R> },
SendProtocolsRequest { io: MessageIO<R> },
Flush { io: MessageIO<R> },
RecvProtocols { io: MessageIO<R> },
SendProtocol { io: MessageIO<R>, protocol: N },
Done
}
impl<R, I> Future for DialerSelectPar<R, I>
where
R: AsyncRead + AsyncWrite,
I: Iterator,
I::Item: AsRef<[u8]> + Clone
I::Item: AsRef<[u8]>
{
type Item = (I::Item, Negotiated<R>);
type Error = ProtocolChoiceError;
type Error = NegotiationError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.inner, DialerSelectParState::Undefined) {
DialerSelectParState::AwaitDialer { mut dialer_fut, protocols } => {
match dialer_fut.poll()? {
Async::Ready(dialer) => {
self.inner = DialerSelectParState::ProtocolList { dialer, protocols }
}
match mem::replace(&mut self.state, ParState::Done) {
ParState::SendHeader { mut io } => {
if io.start_send(Message::Header(Version::V1))?.is_not_ready() {
self.state = ParState::SendHeader { io };
return Ok(Async::NotReady)
}
self.state = ParState::SendProtocolsRequest { io };
}
ParState::SendProtocolsRequest { mut io } => {
if io.start_send(Message::ListProtocols)?.is_not_ready() {
self.state = ParState::SendProtocolsRequest { io };
return Ok(Async::NotReady)
}
debug!("Dialer: Requested supported protocols.");
self.state = ParState::Flush { io }
}
ParState::Flush { mut io } => {
if io.poll_complete()?.is_not_ready() {
self.state = ParState::Flush { io };
return Ok(Async::NotReady)
}
self.state = ParState::RecvProtocols { io }
}
ParState::RecvProtocols { mut io } => {
let msg = match io.poll()? {
Async::NotReady => {
self.inner = DialerSelectParState::AwaitDialer { dialer_fut, protocols };
self.state = ParState::RecvProtocols { io };
return Ok(Async::NotReady)
}
}
}
DialerSelectParState::ProtocolList { mut dialer, protocols } => {
trace!("requesting protocols list");
match dialer.start_send(Request::ListProtocols)? {
AsyncSink::Ready => {
self.inner = DialerSelectParState::FlushListRequest {
dialer,
protocols
}
}
AsyncSink::NotReady(_) => {
self.inner = DialerSelectParState::ProtocolList { dialer, protocols };
return Ok(Async::NotReady)
}
}
}
DialerSelectParState::FlushListRequest { mut dialer, protocols } => {
match dialer.poll_complete()? {
Async::Ready(()) => {
self.inner = DialerSelectParState::AwaitListResponse {
stream: dialer.into_future(),
protocols
}
}
Async::NotReady => {
self.inner = DialerSelectParState::FlushListRequest {
dialer,
protocols
};
return Ok(Async::NotReady)
}
}
}
DialerSelectParState::AwaitListResponse { mut stream, protocols } => {
let (resp, dialer) = match stream.poll() {
Ok(Async::Ready(x)) => x,
Ok(Async::NotReady) => {
self.inner = DialerSelectParState::AwaitListResponse { stream, protocols };
return Ok(Async::NotReady)
}
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
Async::Ready(None) =>
return Err(NegotiationError::from(
io::Error::from(io::ErrorKind::UnexpectedEof))),
Async::Ready(Some(msg)) => msg,
};
trace!("protocols list response: {:?}", resp);
let supported =
if let Some(Response::SupportedProtocols { protocols }) = resp {
protocols
} else {
return Err(ProtocolChoiceError::UnexpectedMessage)
};
let mut found = None;
for local_name in protocols {
for remote_name in &supported {
if remote_name.as_ref() == local_name.as_ref() {
found = Some(local_name);
break;
}
match &msg {
Message::Header(Version::V1) => {
self.state = ParState::RecvProtocols { io }
}
if found.is_some() {
break;
}
}
let proto_name = found.ok_or(ProtocolChoiceError::NoProtocolFound)?;
self.inner = DialerSelectParState::Protocol { dialer, proto_name }
}
DialerSelectParState::Protocol { mut dialer, proto_name } => {
trace!("Requesting protocol: {:?}", proto_name.as_ref());
let req = Request::Protocol { name: proto_name.clone() };
match dialer.start_send(req)? {
AsyncSink::Ready => {
self.inner = DialerSelectParState::FlushProtocol { dialer, proto_name }
}
AsyncSink::NotReady(_) => {
self.inner = DialerSelectParState::Protocol { dialer, proto_name };
return Ok(Async::NotReady)
Message::Protocols(supported) => {
let protocol = self.protocols.by_ref()
.find(|p| supported.iter().any(|s|
s.as_ref() == p.as_ref()))
.ok_or(NegotiationError::Failed)?;
debug!("Dialer: Found supported protocol: {}",
String::from_utf8_lossy(protocol.as_ref()));
self.state = ParState::SendProtocol { io, protocol };
}
_ => return Err(ProtocolError::InvalidMessage.into())
}
}
DialerSelectParState::FlushProtocol { mut dialer, proto_name } => {
match dialer.poll_complete()? {
Async::Ready(()) => {
self.inner = DialerSelectParState::AwaitProtocol {
stream: dialer.into_future(),
proto_name
}
}
Async::NotReady => {
self.inner = DialerSelectParState::FlushProtocol { dialer, proto_name };
return Ok(Async::NotReady)
}
ParState::SendProtocol { mut io, protocol } => {
let p = Protocol::try_from(protocol.as_ref())?;
if io.start_send(Message::Protocol(p.clone()))?.is_not_ready() {
self.state = ParState::SendProtocol { io, protocol };
return Ok(Async::NotReady)
}
debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p);
return Ok(Async::Ready((protocol, io)))
}
DialerSelectParState::AwaitProtocol { mut stream, proto_name } => {
let (resp, dialer) = match stream.poll() {
Ok(Async::Ready(x)) => x,
Ok(Async::NotReady) => {
self.inner = DialerSelectParState::AwaitProtocol { stream, proto_name };
return Ok(Async::NotReady)
}
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
};
trace!("received {:?}", resp);
match resp {
Some(Response::Protocol { ref name })
if name.as_ref() == proto_name.as_ref() =>
{
return Ok(Async::Ready((proto_name, Negotiated(dialer.into_inner()))))
}
_ => return Err(ProtocolChoiceError::UnexpectedMessage)
}
}
DialerSelectParState::Undefined =>
panic!("DialerSelectParState::poll called after completion")
ParState::Done => panic!("ParState::poll called after completion")
}
}
}