feat(swarm): deprecate NegotiatedSubstream in favor of Stream

This patch tackles two things at once that are fairly intertwined:

1. There is no such thing as a "substream" in libp2p, the spec and other implementations only talk about "streams". We fix this by deprecating `NegotiatedSubstream`.
2. Previously, `NegotiatedSubstream` was a type alias that pointed to a type from `multistream-select`, effectively leaking the version of `multistream-select` to all dependencies of `libp2p-swarm`. We fix this by introducing a `Stream` newtype.

Resolves: #3759.
Related: #3748.

Pull-Request: #3912.
This commit is contained in:
Thomas Eizinger
2023-05-12 08:19:23 +02:00
committed by GitHub
parent 234a0d24db
commit 9e625881d5
24 changed files with 234 additions and 169 deletions

View File

@ -34,10 +34,12 @@ use crate::handler::{
FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange,
UpgradeInfoSend,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::{
ConnectionHandlerEvent, KeepAlive, StreamProtocol, StreamUpgradeError, SubstreamProtocol,
ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
@ -47,9 +49,7 @@ use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox};
use libp2p_core::upgrade;
use libp2p_core::upgrade::{
InboundUpgradeApply, NegotiationError, OutboundUpgradeApply, ProtocolError,
};
use libp2p_core::upgrade::{NegotiationError, ProtocolError};
use libp2p_core::Endpoint;
use libp2p_identity::PeerId;
use std::collections::HashSet;
@ -120,16 +120,18 @@ where
handler: THandler,
/// Futures that upgrade incoming substreams.
negotiating_in: FuturesUnordered<
SubstreamUpgrade<
StreamUpgrade<
THandler::InboundOpenInfo,
InboundUpgradeApply<SubstreamBox, SendWrapper<THandler::InboundProtocol>>,
<THandler::InboundProtocol as InboundUpgradeSend>::Output,
<THandler::InboundProtocol as InboundUpgradeSend>::Error,
>,
>,
/// Futures that upgrade outgoing substreams.
negotiating_out: FuturesUnordered<
SubstreamUpgrade<
StreamUpgrade<
THandler::OutboundOpenInfo,
OutboundUpgradeApply<SubstreamBox, SendWrapper<THandler::OutboundProtocol>>,
<THandler::OutboundProtocol as OutboundUpgradeSend>::Output,
<THandler::OutboundProtocol as OutboundUpgradeSend>::Error,
>,
>,
/// The currently planned connection & handler shutdown.
@ -396,7 +398,7 @@ where
Poll::Ready(substream) => {
let (user_data, timeout, upgrade) = requested_substream.extract();
negotiating_out.push(SubstreamUpgrade::new_outbound(
negotiating_out.push(StreamUpgrade::new_outbound(
substream,
user_data,
timeout,
@ -415,7 +417,7 @@ where
Poll::Ready(substream) => {
let protocol = handler.listen_protocol();
negotiating_in.push(SubstreamUpgrade::new_inbound(substream, protocol));
negotiating_in.push(StreamUpgrade::new_inbound(substream, protocol));
continue; // Go back to the top, handler can potentially make progress again.
}
@ -470,24 +472,23 @@ impl<'a> IncomingInfo<'a> {
}
}
struct SubstreamUpgrade<UserData, Upgrade> {
struct StreamUpgrade<UserData, TOk, TErr> {
user_data: Option<UserData>,
timeout: Delay,
upgrade: Upgrade,
upgrade: BoxFuture<'static, Result<TOk, StreamUpgradeError<TErr>>>,
}
impl<UserData, Upgrade>
SubstreamUpgrade<UserData, OutboundUpgradeApply<SubstreamBox, SendWrapper<Upgrade>>>
where
Upgrade: Send + OutboundUpgradeSend,
{
fn new_outbound(
impl<UserData, TOk, TErr> StreamUpgrade<UserData, TOk, TErr> {
fn new_outbound<Upgrade>(
substream: SubstreamBox,
user_data: UserData,
timeout: Delay,
upgrade: Upgrade,
version_override: Option<upgrade::Version>,
) -> Self {
) -> Self
where
Upgrade: OutboundUpgradeSend<Output = TOk, Error = TErr>,
{
let effective_version = match version_override {
Some(version_override) if version_override != upgrade::Version::default() => {
log::debug!(
@ -500,45 +501,77 @@ where
}
_ => upgrade::Version::default(),
};
let protocols = upgrade.protocol_info();
Self {
user_data: Some(user_data),
timeout,
upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version),
upgrade: Box::pin(async move {
let (info, stream) = multistream_select::dialer_select_proto(
substream,
protocols,
effective_version,
)
.await
.map_err(to_stream_upgrade_error)?;
let output = upgrade
.upgrade_outbound(Stream::new(stream), info)
.await
.map_err(StreamUpgradeError::Apply)?;
Ok(output)
}),
}
}
}
impl<UserData, Upgrade>
SubstreamUpgrade<UserData, InboundUpgradeApply<SubstreamBox, SendWrapper<Upgrade>>>
where
Upgrade: Send + InboundUpgradeSend,
{
fn new_inbound(
impl<UserData, TOk, TErr> StreamUpgrade<UserData, TOk, TErr> {
fn new_inbound<Upgrade>(
substream: SubstreamBox,
protocol: SubstreamProtocol<Upgrade, UserData>,
) -> Self {
) -> Self
where
Upgrade: InboundUpgradeSend<Output = TOk, Error = TErr>,
{
let timeout = *protocol.timeout();
let (upgrade, open_info) = protocol.into_upgrade();
let protocols = upgrade.protocol_info();
Self {
user_data: Some(open_info),
timeout: Delay::new(timeout),
upgrade: upgrade::apply_inbound(substream, SendWrapper(upgrade)),
upgrade: Box::pin(async move {
let (info, stream) =
multistream_select::listener_select_proto(substream, protocols)
.await
.map_err(to_stream_upgrade_error)?;
let output = upgrade
.upgrade_inbound(Stream::new(stream), info)
.await
.map_err(StreamUpgradeError::Apply)?;
Ok(output)
}),
}
}
}
impl<UserData, Upgrade> Unpin for SubstreamUpgrade<UserData, Upgrade> {}
fn to_stream_upgrade_error<T>(e: NegotiationError) -> StreamUpgradeError<T> {
match e {
NegotiationError::Failed => StreamUpgradeError::NegotiationFailed,
NegotiationError::ProtocolError(ProtocolError::IoError(e)) => StreamUpgradeError::Io(e),
NegotiationError::ProtocolError(other) => {
StreamUpgradeError::Io(io::Error::new(io::ErrorKind::Other, other))
}
}
}
impl<UserData, Upgrade, UpgradeOutput, TUpgradeError> Future for SubstreamUpgrade<UserData, Upgrade>
where
Upgrade: Future<Output = Result<UpgradeOutput, upgrade::UpgradeError<TUpgradeError>>> + Unpin,
{
type Output = (
UserData,
Result<UpgradeOutput, StreamUpgradeError<TUpgradeError>>,
);
impl<UserData, TOk, TErr> Unpin for StreamUpgrade<UserData, TOk, TErr> {}
impl<UserData, TOk, TErr> Future for StreamUpgrade<UserData, TOk, TErr> {
type Output = (UserData, Result<TOk, StreamUpgradeError<TErr>>);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.timeout.poll_unpin(cx) {
@ -560,21 +593,7 @@ where
.take()
.expect("Future not to be polled again once ready.");
Poll::Ready((
user_data,
result.map_err(|e| match e {
upgrade::UpgradeError::Select(NegotiationError::Failed) => {
StreamUpgradeError::NegotiationFailed
}
upgrade::UpgradeError::Select(NegotiationError::ProtocolError(
ProtocolError::IoError(e),
)) => StreamUpgradeError::Io(e),
upgrade::UpgradeError::Select(NegotiationError::ProtocolError(other)) => {
StreamUpgradeError::Io(io::Error::new(io::ErrorKind::Other, other))
}
upgrade::UpgradeError::Apply(e) => StreamUpgradeError::Apply(e),
}),
))
Poll::Ready((user_data, result))
}
}