mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 15:21:33 +00:00
feat: replace ProtocolName
with AsRef<str>
Previously, a protocol could be any sequence of bytes as long as it started with `/`. Now, we directly parse a protocol as `String` which enforces it to be valid UTF8. To notify users of this change, we delete the `ProtocolName` trait. The new requirement is that users need to provide a type that implements `AsRef<str>`. We also add a `StreamProtocol` newtype in `libp2p-swarm` which provides an easy way for users to ensure their protocol strings are compliant. The newtype enforces that protocol strings start with `/`. `StreamProtocol` also implements `AsRef<str>`, meaning users can directly use it in their upgrades. `multistream-select` by itself only changes marginally with this patch. The only thing we enforce in the type-system is that protocols must implement `AsRef<str>`. Resolves: #2831. Pull-Request: #3746.
This commit is contained in:
@ -22,7 +22,7 @@ use crate::muxing::StreamMuxerEvent;
|
||||
use crate::{
|
||||
muxing::StreamMuxer,
|
||||
transport::{ListenerId, Transport, TransportError, TransportEvent},
|
||||
Multiaddr, ProtocolName,
|
||||
Multiaddr,
|
||||
};
|
||||
use either::Either;
|
||||
use futures::prelude::*;
|
||||
@ -115,21 +115,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum EitherName<A, B> {
|
||||
A(A),
|
||||
B(B),
|
||||
}
|
||||
|
||||
impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
|
||||
fn protocol_name(&self) -> &[u8] {
|
||||
match self {
|
||||
EitherName::A(a) => a.protocol_name(),
|
||||
EitherName::B(b) => b.protocol_name(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Transport for Either<A, B>
|
||||
where
|
||||
B: Transport,
|
||||
|
@ -127,7 +127,7 @@ pub use peer_record::PeerRecord;
|
||||
pub use signed_envelope::SignedEnvelope;
|
||||
pub use translation::address_translation;
|
||||
pub use transport::Transport;
|
||||
pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, UpgradeInfo};
|
||||
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError, UpgradeInfo};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub struct DecodeError(String);
|
||||
|
@ -80,58 +80,11 @@ pub use self::{
|
||||
pub use crate::Negotiated;
|
||||
pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version};
|
||||
|
||||
/// Types serving as protocol names.
|
||||
///
|
||||
/// # Context
|
||||
///
|
||||
/// In situations where we provide a list of protocols that we support,
|
||||
/// the elements of that list are required to implement the [`ProtocolName`] trait.
|
||||
///
|
||||
/// Libp2p will call [`ProtocolName::protocol_name`] on each element of that list, and transmit the
|
||||
/// returned value on the network. If the remote accepts a given protocol, the element
|
||||
/// serves as the return value of the function that performed the negotiation.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use libp2p_core::ProtocolName;
|
||||
///
|
||||
/// enum MyProtocolName {
|
||||
/// Version1,
|
||||
/// Version2,
|
||||
/// Version3,
|
||||
/// }
|
||||
///
|
||||
/// impl ProtocolName for MyProtocolName {
|
||||
/// fn protocol_name(&self) -> &[u8] {
|
||||
/// match *self {
|
||||
/// MyProtocolName::Version1 => b"/myproto/1.0",
|
||||
/// MyProtocolName::Version2 => b"/myproto/2.0",
|
||||
/// MyProtocolName::Version3 => b"/myproto/3.0",
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
pub trait ProtocolName {
|
||||
/// The protocol name as bytes. Transmitted on the network.
|
||||
///
|
||||
/// **Note:** Valid protocol names must start with `/` and
|
||||
/// not exceed 140 bytes in length.
|
||||
fn protocol_name(&self) -> &[u8];
|
||||
}
|
||||
|
||||
impl<T: AsRef<[u8]>> ProtocolName for T {
|
||||
fn protocol_name(&self) -> &[u8] {
|
||||
self.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Common trait for upgrades that can be applied on inbound substreams, outbound substreams,
|
||||
/// or both.
|
||||
pub trait UpgradeInfo {
|
||||
/// Opaque type representing a negotiable protocol.
|
||||
type Info: ProtocolName + Clone;
|
||||
type Info: AsRef<str> + Clone;
|
||||
/// Iterator returned by `protocol_info`.
|
||||
type InfoIter: IntoIterator<Item = Self::Info>;
|
||||
|
||||
|
@ -18,16 +18,14 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError};
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
|
||||
use crate::{connection::ConnectedPoint, Negotiated};
|
||||
use futures::{future::Either, prelude::*};
|
||||
use log::debug;
|
||||
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
|
||||
use std::{iter, mem, pin::Pin, task::Context, task::Poll};
|
||||
use std::{mem, pin::Pin, task::Context, task::Poll};
|
||||
|
||||
pub(crate) use multistream_select::Version;
|
||||
use smallvec::SmallVec;
|
||||
use std::fmt;
|
||||
|
||||
// TODO: Still needed?
|
||||
/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
|
||||
@ -55,14 +53,9 @@ where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
U: InboundUpgrade<Negotiated<C>>,
|
||||
{
|
||||
let iter = up
|
||||
.protocol_info()
|
||||
.into_iter()
|
||||
.map(NameWrap as fn(_) -> NameWrap<_>);
|
||||
let future = multistream_select::listener_select_proto(conn, iter);
|
||||
InboundUpgradeApply {
|
||||
inner: InboundUpgradeApplyState::Init {
|
||||
future,
|
||||
future: multistream_select::listener_select_proto(conn, up.protocol_info().into_iter()),
|
||||
upgrade: up,
|
||||
},
|
||||
}
|
||||
@ -74,14 +67,9 @@ where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
U: OutboundUpgrade<Negotiated<C>>,
|
||||
{
|
||||
let iter = up
|
||||
.protocol_info()
|
||||
.into_iter()
|
||||
.map(NameWrap as fn(_) -> NameWrap<_>);
|
||||
let future = multistream_select::dialer_select_proto(conn, iter, v);
|
||||
OutboundUpgradeApply {
|
||||
inner: OutboundUpgradeApplyState::Init {
|
||||
future,
|
||||
future: multistream_select::dialer_select_proto(conn, up.protocol_info(), v),
|
||||
upgrade: up,
|
||||
},
|
||||
}
|
||||
@ -96,18 +84,19 @@ where
|
||||
inner: InboundUpgradeApplyState<C, U>,
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum InboundUpgradeApplyState<C, U>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
U: InboundUpgrade<Negotiated<C>>,
|
||||
{
|
||||
Init {
|
||||
future: ListenerSelectFuture<C, NameWrap<U::Info>>,
|
||||
future: ListenerSelectFuture<C, U::Info>,
|
||||
upgrade: U,
|
||||
},
|
||||
Upgrade {
|
||||
future: Pin<Box<U::Future>>,
|
||||
name: SmallVec<[u8; 32]>,
|
||||
name: String,
|
||||
},
|
||||
Undefined,
|
||||
}
|
||||
@ -140,10 +129,9 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
};
|
||||
let name = SmallVec::from_slice(info.protocol_name());
|
||||
self.inner = InboundUpgradeApplyState::Upgrade {
|
||||
future: Box::pin(upgrade.upgrade_inbound(io, info.0)),
|
||||
name,
|
||||
future: Box::pin(upgrade.upgrade_inbound(io, info.clone())),
|
||||
name: info.as_ref().to_owned(),
|
||||
};
|
||||
}
|
||||
InboundUpgradeApplyState::Upgrade { mut future, name } => {
|
||||
@ -153,14 +141,11 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(Ok(x)) => {
|
||||
log::trace!("Upgraded inbound stream to {}", DisplayProtocolName(name));
|
||||
log::trace!("Upgraded inbound stream to {name}");
|
||||
return Poll::Ready(Ok(x));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
debug!(
|
||||
"Failed to upgrade inbound stream to {}",
|
||||
DisplayProtocolName(name)
|
||||
);
|
||||
debug!("Failed to upgrade inbound stream to {name}");
|
||||
return Poll::Ready(Err(UpgradeError::Apply(e)));
|
||||
}
|
||||
}
|
||||
@ -188,12 +173,12 @@ where
|
||||
U: OutboundUpgrade<Negotiated<C>>,
|
||||
{
|
||||
Init {
|
||||
future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
|
||||
future: DialerSelectFuture<C, <U::InfoIter as IntoIterator>::IntoIter>,
|
||||
upgrade: U,
|
||||
},
|
||||
Upgrade {
|
||||
future: Pin<Box<U::Future>>,
|
||||
name: SmallVec<[u8; 32]>,
|
||||
name: String,
|
||||
},
|
||||
Undefined,
|
||||
}
|
||||
@ -226,10 +211,9 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
};
|
||||
let name = SmallVec::from_slice(info.protocol_name());
|
||||
self.inner = OutboundUpgradeApplyState::Upgrade {
|
||||
future: Box::pin(upgrade.upgrade_outbound(connection, info.0)),
|
||||
name,
|
||||
future: Box::pin(upgrade.upgrade_outbound(connection, info.clone())),
|
||||
name: info.as_ref().to_owned(),
|
||||
};
|
||||
}
|
||||
OutboundUpgradeApplyState::Upgrade { mut future, name } => {
|
||||
@ -239,17 +223,11 @@ where
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(Ok(x)) => {
|
||||
log::trace!(
|
||||
"Upgraded outbound stream to {}",
|
||||
DisplayProtocolName(name)
|
||||
);
|
||||
log::trace!("Upgraded outbound stream to {name}",);
|
||||
return Poll::Ready(Ok(x));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
debug!(
|
||||
"Failed to upgrade outbound stream to {}",
|
||||
DisplayProtocolName(name)
|
||||
);
|
||||
debug!("Failed to upgrade outbound stream to {name}",);
|
||||
return Poll::Ready(Err(UpgradeError::Apply(e)));
|
||||
}
|
||||
}
|
||||
@ -261,51 +239,3 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type NameWrapIter<I> = iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;
|
||||
|
||||
/// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`.
|
||||
#[derive(Clone)]
|
||||
struct NameWrap<N>(N);
|
||||
|
||||
impl<N: ProtocolName> AsRef<[u8]> for NameWrap<N> {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.0.protocol_name()
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for printing a [`ProtocolName`] that is expected to be mostly ASCII
|
||||
pub(crate) struct DisplayProtocolName<N>(pub(crate) N);
|
||||
|
||||
impl<N: ProtocolName> fmt::Display for DisplayProtocolName<N> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
use fmt::Write;
|
||||
for byte in self.0.protocol_name() {
|
||||
if (b' '..=b'~').contains(byte) {
|
||||
f.write_char(char::from(*byte))?;
|
||||
} else {
|
||||
write!(f, "<{byte:02X}>")?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn display_protocol_name() {
|
||||
assert_eq!(DisplayProtocolName(b"/hello/1.0").to_string(), "/hello/1.0");
|
||||
assert_eq!(DisplayProtocolName("/hellö/").to_string(), "/hell<C3><B6>/");
|
||||
assert_eq!(
|
||||
DisplayProtocolName((0u8..=255).collect::<Vec<_>>()).to_string(),
|
||||
(0..32)
|
||||
.map(|c| format!("<{c:02X}>"))
|
||||
.chain((32..127).map(|c| format!("{}", char::from_u32(c).unwrap())))
|
||||
.chain((127..256).map(|c| format!("<{c:02X}>")))
|
||||
.collect::<String>()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ use void::Void;
|
||||
pub struct DeniedUpgrade;
|
||||
|
||||
impl UpgradeInfo for DeniedUpgrade {
|
||||
type Info = &'static [u8];
|
||||
type Info = &'static str;
|
||||
type InfoIter = iter::Empty<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
|
@ -19,7 +19,7 @@
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::{
|
||||
either::{EitherFuture, EitherName},
|
||||
either::EitherFuture,
|
||||
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
|
||||
};
|
||||
use either::Either;
|
||||
@ -31,7 +31,7 @@ where
|
||||
A: UpgradeInfo,
|
||||
B: UpgradeInfo,
|
||||
{
|
||||
type Info = EitherName<A::Info, B::Info>;
|
||||
type Info = Either<A::Info, B::Info>;
|
||||
type InfoIter = Either<
|
||||
Map<<A::InfoIter as IntoIterator>::IntoIter, fn(A::Info) -> Self::Info>,
|
||||
Map<<B::InfoIter as IntoIterator>::IntoIter, fn(B::Info) -> Self::Info>,
|
||||
@ -39,8 +39,8 @@ where
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
match self {
|
||||
Either::Left(a) => Either::Left(a.protocol_info().into_iter().map(EitherName::A)),
|
||||
Either::Right(b) => Either::Right(b.protocol_info().into_iter().map(EitherName::B)),
|
||||
Either::Left(a) => Either::Left(a.protocol_info().into_iter().map(Either::Left)),
|
||||
Either::Right(b) => Either::Right(b.protocol_info().into_iter().map(Either::Right)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -56,10 +56,10 @@ where
|
||||
|
||||
fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
|
||||
match (self, info) {
|
||||
(Either::Left(a), EitherName::A(info)) => {
|
||||
(Either::Left(a), Either::Left(info)) => {
|
||||
EitherFuture::First(a.upgrade_inbound(sock, info))
|
||||
}
|
||||
(Either::Right(b), EitherName::B(info)) => {
|
||||
(Either::Right(b), Either::Right(info)) => {
|
||||
EitherFuture::Second(b.upgrade_inbound(sock, info))
|
||||
}
|
||||
_ => panic!("Invalid invocation of EitherUpgrade::upgrade_inbound"),
|
||||
@ -78,10 +78,10 @@ where
|
||||
|
||||
fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
|
||||
match (self, info) {
|
||||
(Either::Left(a), EitherName::A(info)) => {
|
||||
(Either::Left(a), Either::Left(info)) => {
|
||||
EitherFuture::First(a.upgrade_outbound(sock, info))
|
||||
}
|
||||
(Either::Right(b), EitherName::B(info)) => {
|
||||
(Either::Right(b), Either::Right(info)) => {
|
||||
EitherFuture::Second(b.upgrade_outbound(sock, info))
|
||||
}
|
||||
_ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound"),
|
||||
|
@ -19,7 +19,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||
use futures::future;
|
||||
use std::iter;
|
||||
use void::Void;
|
||||
@ -39,7 +39,7 @@ impl<P> PendingUpgrade<P> {
|
||||
|
||||
impl<P> UpgradeInfo for PendingUpgrade<P>
|
||||
where
|
||||
P: ProtocolName + Clone,
|
||||
P: AsRef<str> + Clone,
|
||||
{
|
||||
type Info = P;
|
||||
type InfoIter = iter::Once<P>;
|
||||
@ -51,7 +51,7 @@ where
|
||||
|
||||
impl<C, P> InboundUpgrade<C> for PendingUpgrade<P>
|
||||
where
|
||||
P: ProtocolName + Clone,
|
||||
P: AsRef<str> + Clone,
|
||||
{
|
||||
type Output = Void;
|
||||
type Error = Void;
|
||||
@ -64,7 +64,7 @@ where
|
||||
|
||||
impl<C, P> OutboundUpgrade<C> for PendingUpgrade<P>
|
||||
where
|
||||
P: ProtocolName + Clone,
|
||||
P: AsRef<str> + Clone,
|
||||
{
|
||||
type Output = Void;
|
||||
type Error = Void;
|
||||
|
@ -19,7 +19,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||
use futures::future;
|
||||
use std::iter;
|
||||
use void::Void;
|
||||
@ -38,7 +38,7 @@ impl<P> ReadyUpgrade<P> {
|
||||
|
||||
impl<P> UpgradeInfo for ReadyUpgrade<P>
|
||||
where
|
||||
P: ProtocolName + Clone,
|
||||
P: AsRef<str> + Clone,
|
||||
{
|
||||
type Info = P;
|
||||
type InfoIter = iter::Once<P>;
|
||||
@ -50,7 +50,7 @@ where
|
||||
|
||||
impl<C, P> InboundUpgrade<C> for ReadyUpgrade<P>
|
||||
where
|
||||
P: ProtocolName + Clone,
|
||||
P: AsRef<str> + Clone,
|
||||
{
|
||||
type Output = C;
|
||||
type Error = Void;
|
||||
@ -63,7 +63,7 @@ where
|
||||
|
||||
impl<C, P> OutboundUpgrade<C> for ReadyUpgrade<P>
|
||||
where
|
||||
P: ProtocolName + Clone,
|
||||
P: AsRef<str> + Clone,
|
||||
{
|
||||
type Output = C;
|
||||
type Error = Void;
|
||||
|
@ -19,10 +19,7 @@
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::either::EitherFuture;
|
||||
use crate::{
|
||||
either::EitherName,
|
||||
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
|
||||
};
|
||||
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||
use either::Either;
|
||||
use futures::future;
|
||||
use std::iter::{Chain, Map};
|
||||
@ -48,7 +45,7 @@ where
|
||||
A: UpgradeInfo,
|
||||
B: UpgradeInfo,
|
||||
{
|
||||
type Info = EitherName<A::Info, B::Info>;
|
||||
type Info = Either<A::Info, B::Info>;
|
||||
type InfoIter = Chain<
|
||||
Map<<A::InfoIter as IntoIterator>::IntoIter, fn(A::Info) -> Self::Info>,
|
||||
Map<<B::InfoIter as IntoIterator>::IntoIter, fn(B::Info) -> Self::Info>,
|
||||
@ -59,12 +56,12 @@ where
|
||||
.0
|
||||
.protocol_info()
|
||||
.into_iter()
|
||||
.map(EitherName::A as fn(A::Info) -> _);
|
||||
.map(Either::Left as fn(A::Info) -> _);
|
||||
let b = self
|
||||
.1
|
||||
.protocol_info()
|
||||
.into_iter()
|
||||
.map(EitherName::B as fn(B::Info) -> _);
|
||||
.map(Either::Right as fn(B::Info) -> _);
|
||||
|
||||
a.chain(b)
|
||||
}
|
||||
@ -81,8 +78,8 @@ where
|
||||
|
||||
fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
|
||||
match info {
|
||||
EitherName::A(info) => EitherFuture::First(self.0.upgrade_inbound(sock, info)),
|
||||
EitherName::B(info) => EitherFuture::Second(self.1.upgrade_inbound(sock, info)),
|
||||
Either::Left(info) => EitherFuture::First(self.0.upgrade_inbound(sock, info)),
|
||||
Either::Right(info) => EitherFuture::Second(self.1.upgrade_inbound(sock, info)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -98,8 +95,8 @@ where
|
||||
|
||||
fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
|
||||
match info {
|
||||
EitherName::A(info) => EitherFuture::First(self.0.upgrade_outbound(sock, info)),
|
||||
EitherName::B(info) => EitherFuture::Second(self.1.upgrade_outbound(sock, info)),
|
||||
Either::Left(info) => EitherFuture::First(self.0.upgrade_outbound(sock, info)),
|
||||
Either::Right(info) => EitherFuture::Second(self.1.upgrade_outbound(sock, info)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user