Add ProtocolsHandler::InboundOpenInfo. (#1714)

* Add `ProtocolsHandler::InboundOpenInfo`.

* Update swarm/src/protocols_handler/multi.rs

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Update CHANGELOGs and versions.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
Toralf Wittner
2020-08-23 16:57:20 +02:00
committed by GitHub
parent 740a2e0235
commit 0b05b21093
32 changed files with 337 additions and 204 deletions

View File

@ -51,14 +51,16 @@ impl ProtocolsHandler for DummyProtocolsHandler {
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(DeniedUpgrade)
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo
) {
}
@ -75,7 +77,7 @@ impl ProtocolsHandler for DummyProtocolsHandler {
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>) {}
fn inject_listen_upgrade_error(&mut self, _: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error>) {}
fn inject_listen_upgrade_error(&mut self, _: Self::InboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error>) {}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive

View File

@ -59,17 +59,19 @@ where
type Error = TProtoHandler::Error;
type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type InboundOpenInfo = TProtoHandler::InboundOpenInfo;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
self.inner.listen_protocol()
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
info: Self::InboundOpenInfo
) {
self.inner.inject_fully_negotiated_inbound(protocol)
self.inner.inject_fully_negotiated_inbound(protocol, info)
}
fn inject_fully_negotiated_outbound(
@ -94,11 +96,8 @@ where
self.inner.inject_dial_upgrade_error(info, error)
}
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
self.inner.inject_listen_upgrade_error(error)
fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>) {
self.inner.inject_listen_upgrade_error(info, error)
}
fn connection_keep_alive(&self) -> KeepAlive {

View File

@ -57,17 +57,19 @@ where
type Error = TProtoHandler::Error;
type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type InboundOpenInfo = TProtoHandler::InboundOpenInfo;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
self.inner.listen_protocol()
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
info: Self::InboundOpenInfo
) {
self.inner.inject_fully_negotiated_inbound(protocol)
self.inner.inject_fully_negotiated_inbound(protocol, info)
}
fn inject_fully_negotiated_outbound(
@ -90,11 +92,8 @@ where
self.inner.inject_dial_upgrade_error(info, error)
}
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
self.inner.inject_listen_upgrade_error(error)
fn inject_listen_upgrade_error(&mut self, info: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>) {
self.inner.inject_listen_upgrade_error(info, error)
}
fn connection_keep_alive(&self) -> KeepAlive {
@ -111,8 +110,8 @@ where
match ev {
ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)),
ProtocolsHandlerEvent::Close(err) => ProtocolsHandlerEvent::Close(err),
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info }
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } => {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }
}
}
})

View File

@ -96,13 +96,21 @@ where
type Error = <H as ProtocolsHandler>::Error;
type InboundProtocol = Upgrade<K, <H as ProtocolsHandler>::InboundProtocol>;
type OutboundProtocol = <H as ProtocolsHandler>::OutboundProtocol;
type InboundOpenInfo = Info<K, <H as ProtocolsHandler>::InboundOpenInfo>;
type OutboundOpenInfo = (K, <H as ProtocolsHandler>::OutboundOpenInfo);
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
let upgrades = self.handlers.iter()
.map(|(k, h)| (k.clone(), h.listen_protocol().into_upgrade().1))
.collect();
SubstreamProtocol::new(Upgrade { upgrades })
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
let (upgrade, info) = self.handlers.iter()
.map(|(k, h)| {
let (_, u, i) = h.listen_protocol().into_upgrade();
(k.clone(), (u, i))
})
.fold((Upgrade::new(), Info::new()), |(mut upg, mut inf), (k, (u, i))| {
upg.upgrades.push((k.clone(), u));
inf.infos.push((k, i));
(upg, inf)
});
SubstreamProtocol::new(upgrade, info)
}
fn inject_fully_negotiated_outbound (
@ -119,10 +127,13 @@ where
fn inject_fully_negotiated_inbound (
&mut self,
(key, arg): <Self::InboundProtocol as InboundUpgradeSend>::Output
(key, arg): <Self::InboundProtocol as InboundUpgradeSend>::Output,
mut info: Self::InboundOpenInfo
) {
if let Some(h) = self.handlers.get_mut(&key) {
h.inject_fully_negotiated_inbound(arg)
if let Some(i) = info.take(&key) {
h.inject_fully_negotiated_inbound(arg, i)
}
} else {
log::error!("inject_fully_negotiated_inbound: no handler for key")
}
@ -156,47 +167,64 @@ where
fn inject_listen_upgrade_error(
&mut self,
mut info: Self::InboundOpenInfo,
error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>
) {
match error {
ProtocolsHandlerUpgrErr::Timer =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer)
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Timer)
}
}
ProtocolsHandlerUpgrErr::Timeout =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout)
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Timeout)
}
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) =>
for h in self.handlers.values_mut() {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)))
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)))
}
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) =>
match e {
ProtocolError::IoError(e) =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into()));
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(e.kind().into()));
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
}
ProtocolError::InvalidMessage =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
}
ProtocolError::InvalidProtocol =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
}
ProtocolError::TooManyProtocols =>
for h in self.handlers.values_mut() {
let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e)))
}
}
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) =>
if let Some(h) = self.handlers.get_mut(&k) {
h.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
if let Some(i) = info.take(&k) {
h.inject_listen_upgrade_error(i, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
}
}
}
}
@ -309,12 +337,37 @@ impl<H: ProtocolName> ProtocolName for IndexedProtoName<H> {
}
}
/// The aggregated `InboundOpenInfo`s of supported inbound substream protocols.
#[derive(Clone)]
pub struct Info<K, I> {
infos: Vec<(K, I)>
}
impl<K: Eq, I> Info<K, I> {
fn new() -> Self {
Info { infos: Vec::new() }
}
pub fn take(&mut self, k: &K) -> Option<I> {
if let Some(p) = self.infos.iter().position(|(key, _)| key == k) {
return Some(self.infos.remove(p).1)
}
None
}
}
/// Inbound and outbound upgrade for all `ProtocolsHandler`s.
#[derive(Clone)]
pub struct Upgrade<K, H> {
upgrades: Vec<(K, H)>
}
impl<K, H> Upgrade<K, H> {
fn new() -> Self {
Upgrade { upgrades: Vec::new() }
}
}
impl<K, H> fmt::Debug for Upgrade<K, H>
where
K: fmt::Debug + Eq + Hash,
@ -433,4 +486,3 @@ impl fmt::Display for DuplicateProtonameError {
}
impl error::Error for DuplicateProtonameError {}

View File

@ -95,8 +95,11 @@ where
/// The underlying handler.
handler: TProtoHandler,
/// Futures that upgrade incoming substreams.
negotiating_in:
Vec<(InboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::InboundProtocol>>, Delay)>,
negotiating_in: Vec<(
TProtoHandler::InboundOpenInfo,
InboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::InboundProtocol>>,
Delay
)>,
/// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata
/// to pass back once successfully opened.
negotiating_out: Vec<(
@ -192,9 +195,10 @@ where
SubstreamEndpoint::Listener => {
let protocol = self.handler.listen_protocol();
let timeout = protocol.timeout().clone();
let upgrade = upgrade::apply_inbound(substream, SendWrapper(protocol.into_upgrade().1));
let (_, upgrade, info) = protocol.into_upgrade();
let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade));
let timeout = Delay::new(timeout);
self.negotiating_in.push((upgrade, timeout));
self.negotiating_in.push((info, upgrade, timeout));
}
SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => {
let pos = match self
@ -231,27 +235,27 @@ where
// Continue negotiation of newly-opened substreams on the listening side.
// We remove each element from `negotiating_in` one by one and add them back if not ready.
for n in (0..self.negotiating_in.len()).rev() {
let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n);
let (info, mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n);
match Future::poll(Pin::new(&mut timeout), cx) {
Poll::Ready(Ok(_)) => {
let err = ProtocolsHandlerUpgrErr::Timeout;
self.handler.inject_listen_upgrade_error(err);
self.handler.inject_listen_upgrade_error(info, err);
continue
}
Poll::Ready(Err(_)) => {
let err = ProtocolsHandlerUpgrErr::Timer;
self.handler.inject_listen_upgrade_error(err);
self.handler.inject_listen_upgrade_error(info, err);
continue;
}
Poll::Pending => {},
}
match Future::poll(Pin::new(&mut in_progress), cx) {
Poll::Ready(Ok(upgrade)) =>
self.handler.inject_fully_negotiated_inbound(upgrade),
Poll::Pending => self.negotiating_in.push((in_progress, timeout)),
self.handler.inject_fully_negotiated_inbound(upgrade, info),
Poll::Pending => self.negotiating_in.push((info, in_progress, timeout)),
Poll::Ready(Err(err)) => {
let err = ProtocolsHandlerUpgrErr::Upgrade(err);
self.handler.inject_listen_upgrade_error(err);
self.handler.inject_listen_upgrade_error(info, err);
}
}
}
@ -308,14 +312,11 @@ where
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event)));
}
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol,
info,
}) => {
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
let id = self.unique_dial_upgrade_id;
let timeout = protocol.timeout().clone();
self.unique_dial_upgrade_id += 1;
let (version, upgrade) = protocol.into_upgrade();
let (version, upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, (version, SendWrapper(upgrade))));
return Poll::Ready(Ok(
ConnectionHandlerEvent::OutboundSubstreamRequest((id, info, timeout)),

View File

@ -38,7 +38,7 @@ where
TOutbound: OutboundUpgradeSend,
{
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<TInbound>,
listen_protocol: SubstreamProtocol<TInbound, ()>,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<ProtocolsHandlerUpgrErr<<TOutbound as OutboundUpgradeSend>::Error>>,
/// Queue of events to produce in `poll()`.
@ -62,7 +62,7 @@ where
{
/// Creates a `OneShotHandler`.
pub fn new(
listen_protocol: SubstreamProtocol<TInbound>,
listen_protocol: SubstreamProtocol<TInbound, ()>,
config: OneShotHandlerConfig,
) -> Self {
OneShotHandler {
@ -86,7 +86,7 @@ where
///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated.
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<TInbound> {
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<TInbound, ()> {
&self.listen_protocol
}
@ -94,7 +94,7 @@ where
///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated.
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<TInbound> {
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<TInbound, ()> {
&mut self.listen_protocol
}
@ -113,21 +113,20 @@ where
{
fn default() -> Self {
OneShotHandler::new(
SubstreamProtocol::new(Default::default()),
SubstreamProtocol::new(Default::default(), ()),
OneShotHandlerConfig::default()
)
}
}
impl<TInbound, TOutbound, TEvent> ProtocolsHandler
for OneShotHandler<TInbound, TOutbound, TEvent>
impl<TInbound, TOutbound, TEvent> ProtocolsHandler for OneShotHandler<TInbound, TOutbound, TEvent>
where
TInbound: InboundUpgradeSend + Send + 'static,
TOutbound: OutboundUpgradeSend,
TInbound::Output: Into<TEvent>,
TOutbound::Output: Into<TEvent>,
TOutbound::Error: error::Error + Send + 'static,
SubstreamProtocol<TInbound>: Clone,
SubstreamProtocol<TInbound, ()>: Clone,
TEvent: Send + 'static,
{
type InEvent = TOutbound;
@ -138,14 +137,16 @@ where
type InboundProtocol = TInbound;
type OutboundProtocol = TOutbound;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
self.listen_protocol.clone()
}
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgradeSend>::Output,
(): Self::InboundOpenInfo
) {
// If we're shutting down the connection for inactivity, reset the timeout.
if !self.keep_alive.is_yes() {
@ -208,9 +209,8 @@ where
let upgrade = self.dial_queue.remove(0);
return Poll::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(upgrade)
.with_timeout(self.config.outbound_substream_timeout),
info: (),
protocol: SubstreamProtocol::new(upgrade, ())
.with_timeout(self.config.outbound_substream_timeout)
},
);
}
@ -256,7 +256,7 @@ mod tests {
#[test]
fn do_not_keep_idle_connection_alive() {
let mut handler: OneShotHandler<_, DeniedUpgrade, Void> = OneShotHandler::new(
SubstreamProtocol::new(DeniedUpgrade{}),
SubstreamProtocol::new(DeniedUpgrade{}, ()),
Default::default(),
);

View File

@ -105,13 +105,16 @@ where
type InboundProtocol = SelectUpgrade<SendWrapper<<TProto1 as ProtocolsHandler>::InboundProtocol>, SendWrapper<<TProto2 as ProtocolsHandler>::InboundProtocol>>;
type OutboundProtocol = EitherUpgrade<SendWrapper<TProto1::OutboundProtocol>, SendWrapper<TProto2::OutboundProtocol>>;
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;
type InboundOpenInfo = (TProto1::InboundOpenInfo, TProto2::InboundOpenInfo);
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
let proto1 = self.proto1.listen_protocol();
let proto2 = self.proto2.listen_protocol();
let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone();
let choice = SelectUpgrade::new(SendWrapper(proto1.into_upgrade().1), SendWrapper(proto2.into_upgrade().1));
SubstreamProtocol::new(choice).with_timeout(timeout)
let (_, u1, i1) = proto1.into_upgrade();
let (_, u2, i2) = proto2.into_upgrade();
let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2));
SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout)
}
fn inject_fully_negotiated_outbound(&mut self, protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output, endpoint: Self::OutboundOpenInfo) {
@ -127,12 +130,12 @@ where
}
}
fn inject_fully_negotiated_inbound(&mut self, protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output) {
fn inject_fully_negotiated_inbound(&mut self, protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output, (i1, i2): Self::InboundOpenInfo) {
match protocol {
EitherOutput::First(protocol) =>
self.proto1.inject_fully_negotiated_inbound(protocol),
self.proto1.inject_fully_negotiated_inbound(protocol, i1),
EitherOutput::Second(protocol) =>
self.proto2.inject_fully_negotiated_inbound(protocol)
self.proto2.inject_fully_negotiated_inbound(protocol, i2)
}
}
@ -143,9 +146,9 @@ where
}
}
fn inject_address_change(&mut self, addr: &Multiaddr) {
self.proto1.inject_address_change(addr);
self.proto2.inject_address_change(addr)
fn inject_address_change(&mut self, new_address: &Multiaddr) {
self.proto1.inject_address_change(new_address);
self.proto2.inject_address_change(new_address)
}
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
@ -183,19 +186,19 @@ where
}
}
fn inject_listen_upgrade_error(&mut self, error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>) {
fn inject_listen_upgrade_error(&mut self, (i1, i2): Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>) {
match error {
ProtocolsHandlerUpgrErr::Timer => {
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer);
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timer);
self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Timer);
self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Timer)
}
ProtocolsHandlerUpgrErr::Timeout => {
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout);
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Timeout);
self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Timeout);
self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Timeout)
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)));
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)));
self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)));
self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::ProtocolError(e))) => {
let (e1, e2);
@ -217,14 +220,14 @@ where
e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols)
}
}
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)));
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)))
self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)));
self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)))
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
self.proto1.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
self.proto1.inject_listen_upgrade_error(i1, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
self.proto2.inject_listen_upgrade_error(ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
self.proto2.inject_listen_upgrade_error(i2, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)))
}
}
}
@ -234,7 +237,6 @@ where
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
match self.proto1.poll(cx) {
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event)));
@ -242,13 +244,11 @@ where
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::A(event)));
},
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol,
info,
}) => {
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))),
info: EitherOutput::First(info),
protocol: protocol
.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u)))
.map_info(EitherOutput::First)
});
},
Poll::Pending => ()
@ -261,13 +261,11 @@ where
Poll::Ready(ProtocolsHandlerEvent::Close(event)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(event)));
},
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol,
info,
}) => {
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u))),
info: EitherOutput::Second(info),
protocol: protocol
.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u)))
.map_info(EitherOutput::Second)
});
},
Poll::Pending => ()