protocols/relay: Limit inbound streams (#2698)

* protocols/relay: Use prost-codec

* protocols/relay: Respond to at most one incoming reservation request

Also changes poll order prioritizing

- Error handling over everything.
- Queued events over existing circuits.
- Existing circuits over accepting new circuits.
- Reservation management of existing reservation over new reservation
  requests.

* protocols/relay: Deny <= 8 incoming circuit requests with one per peer

* protocols/relay: Deny new circuits before accepting new circuits
This commit is contained in:
Max Inden
2022-06-08 17:33:24 +02:00
committed by GitHub
parent 0bce7f7fff
commit 0d3787ed04
10 changed files with 248 additions and 237 deletions

View File

@ -1,3 +1,14 @@
# 0.9.1 - unreleased
- Respond to at most one incoming reservation request. Deny <= 8 incoming
circuit requests with one per peer. And deny new circuits before accepting new
circuits. See [PR 2698].
- Expose explicits errors via `UpgradeError` instead of generic `io::Error`. See
[PR 2698].
[PR 2698]: https://github.com/libp2p/rust-libp2p/pull/2698/
# 0.9.0 # 0.9.0
- Update to `libp2p-core` `v0.33.0`. - Update to `libp2p-core` `v0.33.0`.

View File

@ -3,7 +3,7 @@ name = "libp2p-relay"
edition = "2021" edition = "2021"
rust-version = "1.56.1" rust-version = "1.56.1"
description = "Communications relaying for libp2p" description = "Communications relaying for libp2p"
version = "0.9.0" version = "0.9.1"
authors = ["Parity Technologies <admin@parity.io>", "Max Inden <mail@max-inden.de>"] authors = ["Parity Technologies <admin@parity.io>", "Max Inden <mail@max-inden.de>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"
@ -21,12 +21,12 @@ libp2p-core = { version = "0.33.0", path = "../../core", default-features = fals
libp2p-swarm = { version = "0.36.0", path = "../../swarm" } libp2p-swarm = { version = "0.36.0", path = "../../swarm" }
log = "0.4" log = "0.4"
pin-project = "1" pin-project = "1"
prost-codec = { version = "0.1", path = "../../misc/prost-codec" }
prost = "0.10" prost = "0.10"
rand = "0.8.4" rand = "0.8.4"
smallvec = "1.6.1" smallvec = "1.6.1"
static_assertions = "1" static_assertions = "1"
thiserror = "1.0" thiserror = "1.0"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
void = "1" void = "1"
[build-dependencies] [build-dependencies]

View File

@ -41,7 +41,7 @@ use libp2p_swarm::{
NotifyHandler, PollParameters, NotifyHandler, PollParameters,
}; };
use std::collections::{hash_map, HashMap, VecDeque}; use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, IoSlice}; use std::io::{Error, ErrorKind, IoSlice};
use std::ops::DerefMut; use std::ops::DerefMut;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -84,7 +84,7 @@ pub enum Event {
/// Denying an inbound circuit request failed. /// Denying an inbound circuit request failed.
InboundCircuitReqDenyFailed { InboundCircuitReqDenyFailed {
src_peer_id: PeerId, src_peer_id: PeerId,
error: std::io::Error, error: inbound_stop::UpgradeError,
}, },
} }
@ -320,7 +320,7 @@ impl NetworkBehaviour for Client {
/// A [`NegotiatedSubstream`] acting as a [`RelayedConnection`]. /// A [`NegotiatedSubstream`] acting as a [`RelayedConnection`].
pub enum RelayedConnection { pub enum RelayedConnection {
InboundAccepting { InboundAccepting {
accept: BoxFuture<'static, Result<RelayedConnection, std::io::Error>>, accept: BoxFuture<'static, Result<RelayedConnection, Error>>,
}, },
Operational { Operational {
read_buffer: Bytes, read_buffer: Bytes,
@ -338,7 +338,10 @@ impl RelayedConnection {
) -> Self { ) -> Self {
RelayedConnection::InboundAccepting { RelayedConnection::InboundAccepting {
accept: async { accept: async {
let (substream, read_buffer) = circuit.accept().await?; let (substream, read_buffer) = circuit
.accept()
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(RelayedConnection::Operational { Ok(RelayedConnection::Operational {
read_buffer, read_buffer,
substream, substream,

View File

@ -39,11 +39,16 @@ use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, SubstreamProtocol, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
}; };
use log::debug; use log::debug;
use std::collections::VecDeque; use std::collections::{HashMap, VecDeque};
use std::fmt; use std::fmt;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
/// The maximum number of circuits being denied concurrently.
///
/// Circuits to be denied exceeding the limit are dropped.
const MAX_NUMBER_DENYING_CIRCUIT: usize = 8;
pub enum In { pub enum In {
Reserve { Reserve {
to_listener: mpsc::Sender<transport::ToListenerMsg>, to_listener: mpsc::Sender<transport::ToListenerMsg>,
@ -100,7 +105,7 @@ pub enum Event {
/// Denying an inbound circuit request failed. /// Denying an inbound circuit request failed.
InboundCircuitReqDenyFailed { InboundCircuitReqDenyFailed {
src_peer_id: PeerId, src_peer_id: PeerId,
error: std::io::Error, error: inbound_stop::UpgradeError,
}, },
} }
@ -196,7 +201,8 @@ pub struct Handler {
/// eventually. /// eventually.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<void::Void>>, alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<void::Void>>,
circuit_deny_futs: FuturesUnordered<BoxFuture<'static, (PeerId, Result<(), std::io::Error>)>>, circuit_deny_futs:
HashMap<PeerId, BoxFuture<'static, Result<(), protocol::inbound_stop::UpgradeError>>>,
/// Futures that try to send errors to the transport. /// Futures that try to send errors to the transport.
/// ///
@ -251,12 +257,27 @@ impl ConnectionHandler for Handler {
} }
Reservation::None => { Reservation::None => {
let src_peer_id = inbound_circuit.src_peer_id(); let src_peer_id = inbound_circuit.src_peer_id();
self.circuit_deny_futs.push(
inbound_circuit if self.circuit_deny_futs.len() == MAX_NUMBER_DENYING_CIRCUIT
.deny(Status::NoReservation) && !self.circuit_deny_futs.contains_key(&src_peer_id)
.map(move |result| (src_peer_id, result)) {
.boxed(), log::warn!(
"Dropping inbound circuit request to be denied from {:?} due to exceeding limit.",
src_peer_id,
);
} else if self
.circuit_deny_futs
.insert(
src_peer_id,
inbound_circuit.deny(Status::NoReservation).boxed(),
) )
.is_some()
{
log::warn!(
"Dropping existing inbound circuit request to be denied from {:?} in favor of new one.",
src_peer_id
)
}
} }
} }
} }
@ -537,20 +558,28 @@ impl ConnectionHandler for Handler {
} }
// Deny incoming circuit requests. // Deny incoming circuit requests.
if let Poll::Ready(Some((src_peer_id, result))) = self.circuit_deny_futs.poll_next_unpin(cx) let maybe_event =
{ self.circuit_deny_futs
match result { .iter_mut()
Ok(()) => { .find_map(|(src_peer_id, fut)| match fut.poll_unpin(cx) {
return Poll::Ready(ConnectionHandlerEvent::Custom( Poll::Ready(Ok(())) => Some((
Event::InboundCircuitReqDenied { src_peer_id }, *src_peer_id,
)) Event::InboundCircuitReqDenied {
} src_peer_id: *src_peer_id,
Err(error) => { },
return Poll::Ready(ConnectionHandlerEvent::Custom( )),
Event::InboundCircuitReqDenyFailed { src_peer_id, error }, Poll::Ready(Err(error)) => Some((
)) *src_peer_id,
} Event::InboundCircuitReqDenyFailed {
} src_peer_id: *src_peer_id,
error,
},
)),
Poll::Pending => None,
});
if let Some((src_peer_id, event)) = maybe_event {
self.circuit_deny_futs.remove(&src_peer_id);
return Poll::Ready(ConnectionHandlerEvent::Custom(event));
} }
// Send errors to transport. // Send errors to transport.

View File

@ -25,13 +25,10 @@ use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_core::{upgrade, Multiaddr, PeerId};
use libp2p_swarm::NegotiatedSubstream; use libp2p_swarm::NegotiatedSubstream;
use prost::Message;
use std::convert::TryInto; use std::convert::TryInto;
use std::io::Cursor;
use std::iter; use std::iter;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error; use thiserror::Error;
use unsigned_varint::codec::UviBytes;
pub struct Upgrade { pub struct Upgrade {
pub reservation_duration: Duration, pub reservation_duration: Duration,
@ -54,23 +51,19 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default(); let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE));
codec.set_max_len(MAX_MESSAGE_SIZE);
let mut substream = Framed::new(substream, codec);
async move { async move {
let msg: bytes::BytesMut = substream
.next()
.await
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
let HopMessage { let HopMessage {
r#type, r#type,
peer, peer,
reservation: _, reservation: _,
limit: _, limit: _,
status: _, status: _,
} = HopMessage::decode(Cursor::new(msg))?; } = substream
.next()
.await
.ok_or(FatalUpgradeError::StreamClosed)??;
let r#type = let r#type =
hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?;
@ -103,28 +96,22 @@ pub enum UpgradeError {
Fatal(#[from] FatalUpgradeError), Fatal(#[from] FatalUpgradeError),
} }
impl From<prost::DecodeError> for UpgradeError { impl From<prost_codec::Error> for UpgradeError {
fn from(error: prost::DecodeError) -> Self { fn from(error: prost_codec::Error) -> Self {
Self::Fatal(error.into())
}
}
impl From<std::io::Error> for UpgradeError {
fn from(error: std::io::Error) -> Self {
Self::Fatal(error.into()) Self::Fatal(error.into())
} }
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum FatalUpgradeError { pub enum FatalUpgradeError {
#[error("Failed to decode message: {0}.")] #[error("Failed to encode or decode")]
Decode( Codec(
#[from] #[from]
#[source] #[source]
prost::DecodeError, prost_codec::Error,
), ),
#[error(transparent)] #[error("Stream closed")]
Io(#[from] std::io::Error), StreamClosed,
#[error("Failed to parse response type field.")] #[error("Failed to parse response type field.")]
ParseTypeField, ParseTypeField,
#[error("Failed to parse peer id.")] #[error("Failed to parse peer id.")]
@ -141,14 +128,14 @@ pub enum Req {
} }
pub struct ReservationReq { pub struct ReservationReq {
substream: Framed<NegotiatedSubstream, UviBytes<Cursor<Vec<u8>>>>, substream: Framed<NegotiatedSubstream, prost_codec::Codec<HopMessage>>,
reservation_duration: Duration, reservation_duration: Duration,
max_circuit_duration: Duration, max_circuit_duration: Duration,
max_circuit_bytes: u64, max_circuit_bytes: u64,
} }
impl ReservationReq { impl ReservationReq {
pub async fn accept(self, addrs: Vec<Multiaddr>) -> Result<(), std::io::Error> { pub async fn accept(self, addrs: Vec<Multiaddr>) -> Result<(), UpgradeError> {
let msg = HopMessage { let msg = HopMessage {
r#type: hop_message::Type::Status.into(), r#type: hop_message::Type::Status.into(),
peer: None, peer: None,
@ -175,7 +162,7 @@ impl ReservationReq {
self.send(msg).await self.send(msg).await
} }
pub async fn deny(self, status: Status) -> Result<(), std::io::Error> { pub async fn deny(self, status: Status) -> Result<(), UpgradeError> {
let msg = HopMessage { let msg = HopMessage {
r#type: hop_message::Type::Status.into(), r#type: hop_message::Type::Status.into(),
peer: None, peer: None,
@ -187,11 +174,8 @@ impl ReservationReq {
self.send(msg).await self.send(msg).await
} }
async fn send(mut self, msg: HopMessage) -> Result<(), std::io::Error> { async fn send(mut self, msg: HopMessage) -> Result<(), UpgradeError> {
let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); self.substream.send(msg).await?;
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
self.substream.send(Cursor::new(encoded_msg)).await?;
self.substream.flush().await?; self.substream.flush().await?;
self.substream.close().await?; self.substream.close().await?;
@ -201,7 +185,7 @@ impl ReservationReq {
pub struct CircuitReq { pub struct CircuitReq {
dst: PeerId, dst: PeerId,
substream: Framed<NegotiatedSubstream, UviBytes<Cursor<Vec<u8>>>>, substream: Framed<NegotiatedSubstream, prost_codec::Codec<HopMessage>>,
} }
impl CircuitReq { impl CircuitReq {
@ -209,7 +193,7 @@ impl CircuitReq {
self.dst self.dst
} }
pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> { pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
let msg = HopMessage { let msg = HopMessage {
r#type: hop_message::Type::Status.into(), r#type: hop_message::Type::Status.into(),
peer: None, peer: None,
@ -234,7 +218,7 @@ impl CircuitReq {
Ok((io, read_buffer.freeze())) Ok((io, read_buffer.freeze()))
} }
pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> { pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> {
let msg = HopMessage { let msg = HopMessage {
r#type: hop_message::Type::Status.into(), r#type: hop_message::Type::Status.into(),
peer: None, peer: None,
@ -243,14 +227,11 @@ impl CircuitReq {
status: Some(status.into()), status: Some(status.into()),
}; };
self.send(msg).await?; self.send(msg).await?;
self.substream.close().await self.substream.close().await.map_err(Into::into)
} }
async fn send(&mut self, msg: HopMessage) -> Result<(), std::io::Error> { async fn send(&mut self, msg: HopMessage) -> Result<(), prost_codec::Error> {
let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); self.substream.send(msg).await?;
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
self.substream.send(Cursor::new(encoded_msg)).await?;
self.substream.flush().await?; self.substream.flush().await?;
Ok(()) Ok(())

View File

@ -25,11 +25,8 @@ use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{upgrade, PeerId}; use libp2p_core::{upgrade, PeerId};
use libp2p_swarm::NegotiatedSubstream; use libp2p_swarm::NegotiatedSubstream;
use prost::Message;
use std::io::Cursor;
use std::iter; use std::iter;
use thiserror::Error; use thiserror::Error;
use unsigned_varint::codec::UviBytes;
pub struct Upgrade {} pub struct Upgrade {}
@ -48,22 +45,18 @@ impl upgrade::InboundUpgrade<NegotiatedSubstream> for Upgrade {
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default(); let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE));
codec.set_max_len(MAX_MESSAGE_SIZE);
let mut substream = Framed::new(substream, codec);
async move { async move {
let msg: bytes::BytesMut = substream
.next()
.await
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
let StopMessage { let StopMessage {
r#type, r#type,
peer, peer,
limit, limit,
status: _, status: _,
} = StopMessage::decode(Cursor::new(msg))?; } = substream
.next()
.await
.ok_or(FatalUpgradeError::StreamClosed)??;
let r#type = let r#type =
stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?;
@ -91,28 +84,22 @@ pub enum UpgradeError {
Fatal(#[from] FatalUpgradeError), Fatal(#[from] FatalUpgradeError),
} }
impl From<prost::DecodeError> for UpgradeError { impl From<prost_codec::Error> for UpgradeError {
fn from(error: prost::DecodeError) -> Self { fn from(error: prost_codec::Error) -> Self {
Self::Fatal(error.into())
}
}
impl From<std::io::Error> for UpgradeError {
fn from(error: std::io::Error) -> Self {
Self::Fatal(error.into()) Self::Fatal(error.into())
} }
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum FatalUpgradeError { pub enum FatalUpgradeError {
#[error("Failed to decode message: {0}.")] #[error("Failed to encode or decode")]
Decode( Codec(
#[from] #[from]
#[source] #[source]
prost::DecodeError, prost_codec::Error,
), ),
#[error(transparent)] #[error("Stream closed")]
Io(#[from] std::io::Error), StreamClosed,
#[error("Failed to parse response type field.")] #[error("Failed to parse response type field.")]
ParseTypeField, ParseTypeField,
#[error("Failed to parse peer id.")] #[error("Failed to parse peer id.")]
@ -124,7 +111,7 @@ pub enum FatalUpgradeError {
} }
pub struct Circuit { pub struct Circuit {
substream: Framed<NegotiatedSubstream, UviBytes<Cursor<Vec<u8>>>>, substream: Framed<NegotiatedSubstream, prost_codec::Codec<StopMessage>>,
src_peer_id: PeerId, src_peer_id: PeerId,
limit: Option<protocol::Limit>, limit: Option<protocol::Limit>,
} }
@ -138,7 +125,7 @@ impl Circuit {
self.limit self.limit
} }
pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> { pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
let msg = StopMessage { let msg = StopMessage {
r#type: stop_message::Type::Status.into(), r#type: stop_message::Type::Status.into(),
peer: None, peer: None,
@ -162,7 +149,7 @@ impl Circuit {
Ok((io, read_buffer.freeze())) Ok((io, read_buffer.freeze()))
} }
pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> { pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> {
let msg = StopMessage { let msg = StopMessage {
r#type: stop_message::Type::Status.into(), r#type: stop_message::Type::Status.into(),
peer: None, peer: None,
@ -170,14 +157,11 @@ impl Circuit {
status: Some(status.into()), status: Some(status.into()),
}; };
self.send(msg).await self.send(msg).await.map_err(Into::into)
} }
async fn send(&mut self, msg: StopMessage) -> Result<(), std::io::Error> { async fn send(&mut self, msg: StopMessage) -> Result<(), prost_codec::Error> {
let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); self.substream.send(msg).await?;
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
self.substream.send(Cursor::new(encoded_msg)).await?;
self.substream.flush().await?; self.substream.flush().await?;
Ok(()) Ok(())

View File

@ -26,13 +26,10 @@ use futures::{future::BoxFuture, prelude::*};
use futures_timer::Delay; use futures_timer::Delay;
use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_core::{upgrade, Multiaddr, PeerId};
use libp2p_swarm::NegotiatedSubstream; use libp2p_swarm::NegotiatedSubstream;
use prost::Message;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io::Cursor;
use std::iter; use std::iter;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error; use thiserror::Error;
use unsigned_varint::codec::UviBytes;
pub enum Upgrade { pub enum Upgrade {
Reserve, Reserve,
@ -74,28 +71,20 @@ impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
}, },
}; };
let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE));
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
let mut codec = UviBytes::default();
codec.set_max_len(MAX_MESSAGE_SIZE);
let mut substream = Framed::new(substream, codec);
async move { async move {
substream.send(Cursor::new(encoded_msg)).await?; substream.send(msg).await?;
let msg: bytes::BytesMut = substream
.next()
.await
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
let HopMessage { let HopMessage {
r#type, r#type,
peer: _, peer: _,
reservation, reservation,
limit, limit,
status, status,
} = HopMessage::decode(Cursor::new(msg))?; } = substream
.next()
.await
.ok_or(FatalUpgradeError::StreamClosed)??;
let r#type = let r#type =
hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?;
@ -216,14 +205,8 @@ pub enum UpgradeError {
Fatal(#[from] FatalUpgradeError), Fatal(#[from] FatalUpgradeError),
} }
impl From<std::io::Error> for UpgradeError { impl From<prost_codec::Error> for UpgradeError {
fn from(error: std::io::Error) -> Self { fn from(error: prost_codec::Error) -> Self {
Self::Fatal(error.into())
}
}
impl From<prost::DecodeError> for UpgradeError {
fn from(error: prost::DecodeError) -> Self {
Self::Fatal(error.into()) Self::Fatal(error.into())
} }
} }
@ -250,14 +233,14 @@ pub enum ReservationFailedReason {
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum FatalUpgradeError { pub enum FatalUpgradeError {
#[error("Failed to decode message: {0}.")] #[error("Failed to encode or decode")]
Decode( Codec(
#[from] #[from]
#[source] #[source]
prost::DecodeError, prost_codec::Error,
), ),
#[error(transparent)] #[error("Stream closed")]
Io(#[from] std::io::Error), StreamClosed,
#[error("Expected 'status' field to be set.")] #[error("Expected 'status' field to be set.")]
MissingStatusField, MissingStatusField,
#[error("Expected 'reservation' field to be set.")] #[error("Expected 'reservation' field to be set.")]

View File

@ -25,13 +25,10 @@ use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{upgrade, PeerId}; use libp2p_core::{upgrade, PeerId};
use libp2p_swarm::NegotiatedSubstream; use libp2p_swarm::NegotiatedSubstream;
use prost::Message;
use std::convert::TryInto; use std::convert::TryInto;
use std::io::Cursor;
use std::iter; use std::iter;
use std::time::Duration; use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use unsigned_varint::codec::UviBytes;
pub struct Upgrade { pub struct Upgrade {
pub relay_peer_id: PeerId, pub relay_peer_id: PeerId,
@ -72,27 +69,19 @@ impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
status: None, status: None,
}; };
let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE));
msg.encode(&mut encoded_msg)
.expect("Vec to have sufficient capacity.");
let mut codec = UviBytes::default();
codec.set_max_len(MAX_MESSAGE_SIZE);
let mut substream = Framed::new(substream, codec);
async move { async move {
substream.send(std::io::Cursor::new(encoded_msg)).await?; substream.send(msg).await?;
let msg: bytes::BytesMut = substream
.next()
.await
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??;
let StopMessage { let StopMessage {
r#type, r#type,
peer: _, peer: _,
limit: _, limit: _,
status, status,
} = StopMessage::decode(Cursor::new(msg))?; } = substream
.next()
.await
.ok_or(FatalUpgradeError::StreamClosed)??;
let r#type = let r#type =
stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?;
@ -141,14 +130,8 @@ pub enum UpgradeError {
Fatal(#[from] FatalUpgradeError), Fatal(#[from] FatalUpgradeError),
} }
impl From<std::io::Error> for UpgradeError { impl From<prost_codec::Error> for UpgradeError {
fn from(error: std::io::Error) -> Self { fn from(error: prost_codec::Error) -> Self {
Self::Fatal(error.into())
}
}
impl From<prost::DecodeError> for UpgradeError {
fn from(error: prost::DecodeError) -> Self {
Self::Fatal(error.into()) Self::Fatal(error.into())
} }
} }
@ -163,14 +146,14 @@ pub enum CircuitFailedReason {
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum FatalUpgradeError { pub enum FatalUpgradeError {
#[error("Failed to decode message: {0}.")] #[error("Failed to encode or decode")]
Decode( Codec(
#[from] #[from]
#[source] #[source]
prost::DecodeError, prost_codec::Error,
), ),
#[error(transparent)] #[error("Stream closed")]
Io(#[from] std::io::Error), StreamClosed,
#[error("Expected 'status' field to be set.")] #[error("Expected 'status' field to be set.")]
MissingStatusField, MissingStatusField,
#[error("Failed to parse response type field.")] #[error("Failed to parse response type field.")]

View File

@ -138,14 +138,14 @@ pub enum Event {
/// Accepting an inbound reservation request failed. /// Accepting an inbound reservation request failed.
ReservationReqAcceptFailed { ReservationReqAcceptFailed {
src_peer_id: PeerId, src_peer_id: PeerId,
error: std::io::Error, error: inbound_hop::UpgradeError,
}, },
/// An inbound reservation request has been denied. /// An inbound reservation request has been denied.
ReservationReqDenied { src_peer_id: PeerId }, ReservationReqDenied { src_peer_id: PeerId },
/// Denying an inbound reservation request has failed. /// Denying an inbound reservation request has failed.
ReservationReqDenyFailed { ReservationReqDenyFailed {
src_peer_id: PeerId, src_peer_id: PeerId,
error: std::io::Error, error: inbound_hop::UpgradeError,
}, },
/// An inbound reservation has timed out. /// An inbound reservation has timed out.
ReservationTimedOut { src_peer_id: PeerId }, ReservationTimedOut { src_peer_id: PeerId },
@ -162,7 +162,7 @@ pub enum Event {
CircuitReqDenyFailed { CircuitReqDenyFailed {
src_peer_id: PeerId, src_peer_id: PeerId,
dst_peer_id: PeerId, dst_peer_id: PeerId,
error: std::io::Error, error: inbound_hop::UpgradeError,
}, },
/// An inbound cirucit request has been accepted. /// An inbound cirucit request has been accepted.
CircuitReqAccepted { CircuitReqAccepted {
@ -179,7 +179,7 @@ pub enum Event {
CircuitReqAcceptFailed { CircuitReqAcceptFailed {
src_peer_id: PeerId, src_peer_id: PeerId,
dst_peer_id: PeerId, dst_peer_id: PeerId,
error: std::io::Error, error: inbound_hop::UpgradeError,
}, },
/// An inbound circuit has closed. /// An inbound circuit has closed.
CircuitClosed { CircuitClosed {

View File

@ -153,11 +153,11 @@ pub enum Event {
renewed: bool, renewed: bool,
}, },
/// Accepting an inbound reservation request failed. /// Accepting an inbound reservation request failed.
ReservationReqAcceptFailed { error: std::io::Error }, ReservationReqAcceptFailed { error: inbound_hop::UpgradeError },
/// An inbound reservation request has been denied. /// An inbound reservation request has been denied.
ReservationReqDenied {}, ReservationReqDenied {},
/// Denying an inbound reservation request has failed. /// Denying an inbound reservation request has failed.
ReservationReqDenyFailed { error: std::io::Error }, ReservationReqDenyFailed { error: inbound_hop::UpgradeError },
/// An inbound reservation has timed out. /// An inbound reservation has timed out.
ReservationTimedOut {}, ReservationTimedOut {},
/// An inbound circuit request has been received. /// An inbound circuit request has been received.
@ -178,7 +178,7 @@ pub enum Event {
CircuitReqDenyFailed { CircuitReqDenyFailed {
circuit_id: Option<CircuitId>, circuit_id: Option<CircuitId>,
dst_peer_id: PeerId, dst_peer_id: PeerId,
error: std::io::Error, error: inbound_hop::UpgradeError,
}, },
/// An inbound cirucit request has been accepted. /// An inbound cirucit request has been accepted.
CircuitReqAccepted { CircuitReqAccepted {
@ -189,7 +189,7 @@ pub enum Event {
CircuitReqAcceptFailed { CircuitReqAcceptFailed {
circuit_id: CircuitId, circuit_id: CircuitId,
dst_peer_id: PeerId, dst_peer_id: PeerId,
error: std::io::Error, error: inbound_hop::UpgradeError,
}, },
/// An outbound substream for an inbound circuit request has been /// An outbound substream for an inbound circuit request has been
/// negotiated. /// negotiated.
@ -354,8 +354,7 @@ impl IntoConnectionHandler for Prototype {
config: self.config, config: self.config,
queued_events: Default::default(), queued_events: Default::default(),
pending_error: Default::default(), pending_error: Default::default(),
reservation_accept_futures: Default::default(), reservation_request_future: Default::default(),
reservation_deny_futures: Default::default(),
circuit_accept_futures: Default::default(), circuit_accept_futures: Default::default(),
circuit_deny_futures: Default::default(), circuit_deny_futures: Default::default(),
alive_lend_out_substreams: Default::default(), alive_lend_out_substreams: Default::default(),
@ -403,17 +402,20 @@ pub struct Handler {
/// Until when to keep the connection alive. /// Until when to keep the connection alive.
keep_alive: KeepAlive, keep_alive: KeepAlive,
/// Futures accepting an inbound reservation request. /// Future handling inbound reservation request.
reservation_accept_futures: Futures<Result<(), std::io::Error>>, reservation_request_future: Option<ReservationRequestFuture>,
/// Futures denying an inbound reservation request.
reservation_deny_futures: Futures<Result<(), std::io::Error>>,
/// Timeout for the currently active reservation. /// Timeout for the currently active reservation.
active_reservation: Option<Delay>, active_reservation: Option<Delay>,
/// Futures accepting an inbound circuit request. /// Futures accepting an inbound circuit request.
circuit_accept_futures: Futures<Result<CircuitParts, (CircuitId, PeerId, std::io::Error)>>, circuit_accept_futures:
Futures<Result<CircuitParts, (CircuitId, PeerId, inbound_hop::UpgradeError)>>,
/// Futures deying an inbound circuit request. /// Futures deying an inbound circuit request.
circuit_deny_futures: Futures<(Option<CircuitId>, PeerId, Result<(), std::io::Error>)>, circuit_deny_futures: Futures<(
Option<CircuitId>,
PeerId,
Result<(), inbound_hop::UpgradeError>,
)>,
/// Tracks substreams lend out to other [`Handler`]s. /// Tracks substreams lend out to other [`Handler`]s.
/// ///
/// Contains a [`futures::future::Future`] for each lend out substream that /// Contains a [`futures::future::Future`] for each lend out substream that
@ -427,6 +429,11 @@ pub struct Handler {
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,
} }
enum ReservationRequestFuture {
Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>),
Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>),
}
type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>; type Futures<T> = FuturesUnordered<BoxFuture<'static, T>>;
impl ConnectionHandler for Handler { impl ConnectionHandler for Handler {
@ -512,15 +519,29 @@ impl ConnectionHandler for Handler {
inbound_reservation_req, inbound_reservation_req,
addrs, addrs,
} => { } => {
self.reservation_accept_futures if self
.push(inbound_reservation_req.accept(addrs).boxed()); .reservation_request_future
.replace(ReservationRequestFuture::Accepting(
inbound_reservation_req.accept(addrs).boxed(),
))
.is_some()
{
log::warn!("Dropping existing deny/accept future in favor of new one.")
}
} }
In::DenyReservationReq { In::DenyReservationReq {
inbound_reservation_req, inbound_reservation_req,
status, status,
} => { } => {
self.reservation_deny_futures if self
.push(inbound_reservation_req.deny(status).boxed()); .reservation_request_future
.replace(ReservationRequestFuture::Denying(
inbound_reservation_req.deny(status).boxed(),
))
.is_some()
{
log::warn!("Dropping existing deny/accept future in favor of new one.")
}
} }
In::NegotiateOutboundConnect { In::NegotiateOutboundConnect {
circuit_id, circuit_id,
@ -723,6 +744,7 @@ impl ConnectionHandler for Handler {
return Poll::Ready(event); return Poll::Ready(event);
} }
// Progress existing circuits.
if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
self.circuits.poll_next_unpin(cx) self.circuits.poll_next_unpin(cx)
{ {
@ -744,40 +766,30 @@ impl ConnectionHandler for Handler {
} }
} }
if let Poll::Ready(Some(result)) = self.reservation_accept_futures.poll_next_unpin(cx) { // Deny new circuits.
if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) =
self.circuit_deny_futures.poll_next_unpin(cx)
{
match result { match result {
Ok(()) => { Ok(()) => {
let renewed = self return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied {
.active_reservation circuit_id,
.replace(Delay::new(self.config.reservation_duration)) dst_peer_id,
.is_some(); }));
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqAccepted { renewed },
));
} }
Err(error) => { Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom( return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqAcceptFailed { error }, Event::CircuitReqDenyFailed {
)); circuit_id,
} dst_peer_id,
} error,
} },
if let Poll::Ready(Some(result)) = self.reservation_deny_futures.poll_next_unpin(cx) {
match result {
Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqDenied {},
))
}
Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqDenyFailed { error },
)); ));
} }
} }
} }
// Accept new circuits.
if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) { if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) {
match result { match result {
Ok(parts) => { Ok(parts) => {
@ -838,32 +850,7 @@ impl ConnectionHandler for Handler {
} }
} }
if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = // Check active reservation.
self.circuit_deny_futures.poll_next_unpin(cx)
{
match result {
Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied {
circuit_id,
dst_peer_id,
}));
}
Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::CircuitReqDenyFailed {
circuit_id,
dst_peer_id,
error,
},
));
}
}
}
while let Poll::Ready(Some(Err(Canceled))) =
self.alive_lend_out_substreams.poll_next_unpin(cx)
{}
if let Some(Poll::Ready(())) = self if let Some(Poll::Ready(())) = self
.active_reservation .active_reservation
.as_mut() .as_mut()
@ -875,8 +862,58 @@ impl ConnectionHandler for Handler {
)); ));
} }
if self.reservation_accept_futures.is_empty() // Progress reservation request.
&& self.reservation_deny_futures.is_empty() match self.reservation_request_future.as_mut() {
Some(ReservationRequestFuture::Accepting(fut)) => {
if let Poll::Ready(result) = fut.poll_unpin(cx) {
self.reservation_request_future = None;
match result {
Ok(()) => {
let renewed = self
.active_reservation
.replace(Delay::new(self.config.reservation_duration))
.is_some();
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqAccepted { renewed },
));
}
Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqAcceptFailed { error },
));
}
}
}
}
Some(ReservationRequestFuture::Denying(fut)) => {
if let Poll::Ready(result) = fut.poll_unpin(cx) {
self.reservation_request_future = None;
match result {
Ok(()) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqDenied {},
))
}
Err(error) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::ReservationReqDenyFailed { error },
));
}
}
}
}
None => {}
}
// Check lend out substreams.
while let Poll::Ready(Some(Err(Canceled))) =
self.alive_lend_out_substreams.poll_next_unpin(cx)
{}
// Check keep alive status.
if self.reservation_request_future.is_none()
&& self.circuit_accept_futures.is_empty() && self.circuit_accept_futures.is_empty()
&& self.circuit_deny_futures.is_empty() && self.circuit_deny_futures.is_empty()
&& self.alive_lend_out_substreams.is_empty() && self.alive_lend_out_substreams.is_empty()