// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError}; use crate::{connection::ConnectedPoint, Negotiated}; use futures::{future::Either, prelude::*}; use log::debug; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use std::{iter, mem, pin::Pin, task::Context, task::Poll}; pub use multistream_select::Version; use smallvec::SmallVec; use std::fmt; // TODO: Still needed? /// Applies an upgrade to the inbound and outbound direction of a connection or substream. pub fn apply( conn: C, up: U, cp: ConnectedPoint, v: Version, ) -> Either, OutboundUpgradeApply> where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade> + OutboundUpgrade>, { match cp { ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => { Either::Right(apply_outbound(conn, up, v)) } _ => Either::Left(apply_inbound(conn, up)), } } /// Tries to perform an upgrade on an inbound connection or substream. pub fn apply_inbound(conn: C, up: U) -> InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { let iter = up .protocol_info() .into_iter() .map(NameWrap as fn(_) -> NameWrap<_>); let future = multistream_select::listener_select_proto(conn, iter); InboundUpgradeApply { inner: InboundUpgradeApplyState::Init { future, upgrade: up, }, } } /// Tries to perform an upgrade on an outbound connection or substream. pub fn apply_outbound(conn: C, up: U, v: Version) -> OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, { let iter = up .protocol_info() .into_iter() .map(NameWrap as fn(_) -> NameWrap<_>); let future = multistream_select::dialer_select_proto(conn, iter, v); OutboundUpgradeApply { inner: OutboundUpgradeApplyState::Init { future, upgrade: up, }, } } /// Future returned by `apply_inbound`. Drives the upgrade process. pub struct InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { inner: InboundUpgradeApplyState, } enum InboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { Init { future: ListenerSelectFuture>, upgrade: U, }, Upgrade { future: Pin>, name: SmallVec<[u8; 32]>, }, Undefined, } impl Unpin for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { } impl Future for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) { InboundUpgradeApplyState::Init { mut future, upgrade, } => { let (info, io) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { self.inner = InboundUpgradeApplyState::Init { future, upgrade }; return Poll::Pending; } }; let name = SmallVec::from_slice(info.protocol_name()); self.inner = InboundUpgradeApplyState::Upgrade { future: Box::pin(upgrade.upgrade_inbound(io, info.0)), name, }; } InboundUpgradeApplyState::Upgrade { mut future, name } => { match Future::poll(Pin::new(&mut future), cx) { Poll::Pending => { self.inner = InboundUpgradeApplyState::Upgrade { future, name }; return Poll::Pending; } Poll::Ready(Ok(x)) => { log::trace!("Upgraded inbound stream to {}", DisplayProtocolName(name)); return Poll::Ready(Ok(x)); } Poll::Ready(Err(e)) => { debug!( "Failed to upgrade inbound stream to {}", DisplayProtocolName(name) ); return Poll::Ready(Err(UpgradeError::Apply(e))); } } } InboundUpgradeApplyState::Undefined => { panic!("InboundUpgradeApplyState::poll called after completion") } } } } } /// Future returned by `apply_outbound`. Drives the upgrade process. pub struct OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, { inner: OutboundUpgradeApplyState, } enum OutboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, { Init { future: DialerSelectFuture::IntoIter>>, upgrade: U, }, Upgrade { future: Pin>, name: SmallVec<[u8; 32]>, }, Undefined, } impl Unpin for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, { } impl Future for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) { OutboundUpgradeApplyState::Init { mut future, upgrade, } => { let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? { Poll::Ready(x) => x, Poll::Pending => { self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; return Poll::Pending; } }; let name = SmallVec::from_slice(info.protocol_name()); self.inner = OutboundUpgradeApplyState::Upgrade { future: Box::pin(upgrade.upgrade_outbound(connection, info.0)), name, }; } OutboundUpgradeApplyState::Upgrade { mut future, name } => { match Future::poll(Pin::new(&mut future), cx) { Poll::Pending => { self.inner = OutboundUpgradeApplyState::Upgrade { future, name }; return Poll::Pending; } Poll::Ready(Ok(x)) => { log::trace!( "Upgraded outbound stream to {}", DisplayProtocolName(name) ); return Poll::Ready(Ok(x)); } Poll::Ready(Err(e)) => { debug!( "Failed to upgrade outbound stream to {}", DisplayProtocolName(name) ); return Poll::Ready(Err(UpgradeError::Apply(e))); } } } OutboundUpgradeApplyState::Undefined => { panic!("OutboundUpgradeApplyState::poll called after completion") } } } } } type NameWrapIter = iter::Map::Item) -> NameWrap<::Item>>; /// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`. #[derive(Clone)] struct NameWrap(N); impl AsRef<[u8]> for NameWrap { fn as_ref(&self) -> &[u8] { self.0.protocol_name() } } /// Wrapper for printing a [`ProtocolName`] that is expected to be mostly ASCII pub(crate) struct DisplayProtocolName(pub N); impl fmt::Display for DisplayProtocolName { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { use fmt::Write; for byte in self.0.protocol_name() { if (b' '..=b'~').contains(byte) { f.write_char(char::from(*byte))?; } else { write!(f, "<{byte:02X}>")?; } } Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn display_protocol_name() { assert_eq!(DisplayProtocolName(b"/hello/1.0").to_string(), "/hello/1.0"); assert_eq!(DisplayProtocolName("/hellö/").to_string(), "/hell/"); assert_eq!( DisplayProtocolName((0u8..=255).collect::>()).to_string(), (0..32) .map(|c| format!("<{c:02X}>")) .chain((32..127).map(|c| format!("{}", char::from_u32(c).unwrap()))) .chain((127..256).map(|c| format!("<{c:02X}>"))) .collect::() ); } }