protocols/rendezvous: Implement protocol (#2107)

Implement the libp2p rendezvous protocol.

> A lightweight mechanism for generalized peer discovery. It can be used for
bootstrap purposes, real time peer discovery, application specific routing, and
so on.

Co-authored-by: rishflab <rishflab@hotmail.com>
Co-authored-by: Daniel Karzel <daniel@comit.network>
This commit is contained in:
Thomas Eizinger
2021-09-08 00:36:52 +10:00
committed by GitHub
parent c1ae8a046c
commit adcfdc0750
32 changed files with 4463 additions and 25 deletions

View File

@ -0,0 +1,337 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl};
use crate::handler;
use crate::handler::outbound;
use crate::handler::outbound::OpenInfo;
use crate::substream_handler::SubstreamProtocolsHandler;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use libp2p_core::connection::ConnectionId;
use libp2p_core::identity::error::SigningError;
use libp2p_core::identity::Keypair;
use libp2p_core::{Multiaddr, PeerId, PeerRecord};
use libp2p_swarm::{
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, VecDeque};
use std::iter::FromIterator;
use std::task::{Context, Poll};
use std::time::Duration;
pub struct Behaviour {
events: VecDeque<
NetworkBehaviourAction<
Event,
SubstreamProtocolsHandler<void::Void, outbound::Stream, outbound::OpenInfo>,
>,
>,
keypair: Keypair,
pending_register_requests: Vec<(Namespace, PeerId, Option<Ttl>)>,
/// Hold addresses of all peers that we have discovered so far.
///
/// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by returning addresses from [`NetworkBehaviour::addresses_of_peer`].
discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
/// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers` otherwise we have a memory leak.
expiring_registrations: FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
}
impl Behaviour {
/// Create a new instance of the rendezvous [`NetworkBehaviour`].
pub fn new(keypair: Keypair) -> Self {
Self {
events: Default::default(),
keypair,
pending_register_requests: vec![],
discovered_peers: Default::default(),
expiring_registrations: FuturesUnordered::from_iter(vec![
futures::future::pending().boxed()
]),
}
}
/// Register our external addresses in the given namespace with the given rendezvous peer.
///
/// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported
/// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`].
pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option<Ttl>) {
self.pending_register_requests
.push((namespace, rendezvous_node, ttl));
}
/// Unregister ourselves from the given namespace with the given rendezvous peer.
pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::UnregisterRequest(namespace),
},
handler: NotifyHandler::Any,
});
}
/// Discover other peers at a given rendezvous peer.
///
/// If desired, the registrations can be filtered by a namespace.
/// If no namespace is given, peers from all namespaces will be returned.
/// A successfully discovery returns a cookie within [`Event::Discovered`].
/// Such a cookie can be used to only fetch the _delta_ of registrations since
/// the cookie was acquired.
pub fn discover(
&mut self,
ns: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<u64>,
rendezvous_node: PeerId,
) {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::DiscoverRequest {
namespace: ns,
cookie,
limit,
},
},
handler: NotifyHandler::Any,
});
}
}
#[derive(Debug, thiserror::Error)]
pub enum RegisterError {
#[error("We don't know about any externally reachable addresses of ours")]
NoExternalAddresses,
#[error("Failed to make a new PeerRecord")]
FailedToMakeRecord(#[from] SigningError),
#[error("Failed to register with Rendezvous node")]
Remote {
rendezvous_node: PeerId,
namespace: Namespace,
error: ErrorCode,
},
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
/// We successfully discovered other nodes with using the contained rendezvous node.
Discovered {
rendezvous_node: PeerId,
registrations: Vec<Registration>,
cookie: Cookie,
},
/// We failed to discover other nodes on the contained rendezvous node.
DiscoverFailed {
rendezvous_node: PeerId,
namespace: Option<Namespace>,
error: ErrorCode,
},
/// We successfully registered with the contained rendezvous node.
Registered {
rendezvous_node: PeerId,
ttl: Ttl,
namespace: Namespace,
},
/// We failed to register with the contained rendezvous node.
RegisterFailed(RegisterError),
/// The connection details we learned from this node expired.
Expired { peer: PeerId },
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler =
SubstreamProtocolsHandler<void::Void, outbound::Stream, outbound::OpenInfo>;
type OutEvent = Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
let initial_keep_alive = Duration::from_secs(30);
SubstreamProtocolsHandler::new_outbound_only(initial_keep_alive)
}
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.discovered_peers
.iter()
.filter_map(|((candidate, _), addresses)| (candidate == peer).then(|| addresses))
.flatten()
.cloned()
.collect()
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: handler::OutboundOutEvent,
) {
let new_events = match event {
handler::OutboundOutEvent::InboundEvent { message, .. } => void::unreachable(message),
handler::OutboundOutEvent::OutboundEvent { message, .. } => handle_outbound_event(
message,
peer_id,
&mut self.discovered_peers,
&mut self.expiring_registrations,
),
handler::OutboundOutEvent::InboundError { error, .. } => void::unreachable(error),
handler::OutboundOutEvent::OutboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error);
vec![NetworkBehaviourAction::CloseConnection {
peer_id,
connection: CloseConnection::One(connection_id),
}]
}
};
self.events.extend(new_events);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
poll_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() {
// Update our external addresses based on the Swarm's current knowledge.
// It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside.
let external_addresses = poll_params
.external_addresses()
.map(|r| r.addr)
.collect::<Vec<Multiaddr>>();
if external_addresses.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
Event::RegisterFailed(RegisterError::NoExternalAddresses),
));
}
let action = match PeerRecord::new(self.keypair.clone(), external_addresses) {
Ok(peer_record) => NetworkBehaviourAction::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::RegisterRequest(NewRegistration {
namespace,
record: peer_record,
ttl,
}),
},
handler: NotifyHandler::Any,
},
Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed(
RegisterError::FailedToMakeRecord(signing_error),
)),
};
return Poll::Ready(action);
}
if let Some(expired_registration) =
futures::ready!(self.expiring_registrations.poll_next_unpin(cx))
{
self.discovered_peers.remove(&expired_registration);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event::Expired {
peer: expired_registration.0,
}));
}
Poll::Pending
}
}
fn handle_outbound_event(
event: outbound::OutEvent,
peer_id: PeerId,
discovered_peers: &mut HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
expiring_registrations: &mut FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
) -> Vec<
NetworkBehaviourAction<
Event,
SubstreamProtocolsHandler<void::Void, outbound::Stream, outbound::OpenInfo>,
>,
> {
match event {
outbound::OutEvent::Registered { namespace, ttl } => {
vec![NetworkBehaviourAction::GenerateEvent(Event::Registered {
rendezvous_node: peer_id,
ttl,
namespace,
})]
}
outbound::OutEvent::RegisterFailed(namespace, error) => {
vec![NetworkBehaviourAction::GenerateEvent(
Event::RegisterFailed(RegisterError::Remote {
rendezvous_node: peer_id,
namespace,
error,
}),
)]
}
outbound::OutEvent::Discovered {
registrations,
cookie,
} => {
discovered_peers.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();
let addresses = registration.record.addresses().to_vec();
((peer_id, namespace), addresses)
}));
expiring_registrations.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
let _ =
wasm_timer::Delay::new(Duration::from_secs(registration.ttl as u64)).await;
(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));
vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered {
rendezvous_node: peer_id,
registrations,
cookie,
})]
}
outbound::OutEvent::DiscoverFailed { namespace, error } => {
vec![NetworkBehaviourAction::GenerateEvent(
Event::DiscoverFailed {
rendezvous_node: peer_id,
namespace,
error,
},
)]
}
}
}

View File

@ -0,0 +1,622 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::DEFAULT_TTL;
use asynchronous_codec::{Bytes, BytesMut, Decoder, Encoder};
use libp2p_core::{peer_record, signed_envelope, PeerRecord, SignedEnvelope};
use rand::RngCore;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use unsigned_varint::codec::UviBytes;
pub type Ttl = u64;
#[derive(Debug, Clone)]
pub enum Message {
Register(NewRegistration),
RegisterResponse(Result<Ttl, ErrorCode>),
Unregister(Namespace),
Discover {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<Ttl>,
},
DiscoverResponse(Result<(Vec<Registration>, Cookie), ErrorCode>),
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct Namespace(String);
impl Namespace {
/// Creates a new [`Namespace`] from a static string.
///
/// This will panic if the namespace is too long. We accepting panicking in this case because we are enforcing a `static lifetime which means this value can only be a constant in the program and hence we hope the developer checked that it is of an acceptable length.
pub fn from_static(value: &'static str) -> Self {
if value.len() > 255 {
panic!("Namespace '{}' is too long!", value)
}
Namespace(value.to_owned())
}
pub fn new(value: String) -> Result<Self, NamespaceTooLong> {
if value.len() > 255 {
return Err(NamespaceTooLong);
}
Ok(Namespace(value))
}
}
impl From<Namespace> for String {
fn from(namespace: Namespace) -> Self {
namespace.0
}
}
impl fmt::Display for Namespace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl PartialEq<str> for Namespace {
fn eq(&self, other: &str) -> bool {
self.0.eq(other)
}
}
impl PartialEq<Namespace> for str {
fn eq(&self, other: &Namespace) -> bool {
other.0.eq(self)
}
}
#[derive(Debug, thiserror::Error)]
#[error("Namespace is too long")]
pub struct NamespaceTooLong;
#[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub struct Cookie {
id: u64,
namespace: Option<Namespace>,
}
impl Cookie {
/// Construct a new [`Cookie`] for a given namespace.
///
/// This cookie will only be valid for subsequent DISCOVER requests targeting the same namespace.
pub fn for_namespace(namespace: Namespace) -> Self {
Self {
id: rand::thread_rng().next_u64(),
namespace: Some(namespace),
}
}
/// Construct a new [`Cookie`] for a DISCOVER request that inquires about all namespaces.
pub fn for_all_namespaces() -> Self {
Self {
id: rand::random(),
namespace: None,
}
}
pub fn into_wire_encoding(self) -> Vec<u8> {
let id_bytes = self.id.to_be_bytes();
let namespace = self.namespace.map(|ns| ns.0).unwrap_or_default();
let mut buffer = Vec::with_capacity(id_bytes.len() + namespace.len());
buffer.extend_from_slice(&id_bytes);
buffer.extend_from_slice(namespace.as_bytes());
buffer
}
pub fn from_wire_encoding(mut bytes: Vec<u8>) -> Result<Self, InvalidCookie> {
// check length early to avoid panic during slicing
if bytes.len() < 8 {
return Err(InvalidCookie);
}
let namespace = bytes.split_off(8);
let namespace = if namespace.is_empty() {
None
} else {
Some(
Namespace::new(String::from_utf8(namespace).map_err(|_| InvalidCookie)?)
.map_err(|_| InvalidCookie)?,
)
};
let bytes = <[u8; 8]>::try_from(bytes).map_err(|_| InvalidCookie)?;
let id = u64::from_be_bytes(bytes);
Ok(Self { id, namespace })
}
pub fn namespace(&self) -> Option<&Namespace> {
self.namespace.as_ref()
}
}
#[derive(Debug, thiserror::Error)]
#[error("The cookie was malformed")]
pub struct InvalidCookie;
#[derive(Debug, Clone)]
pub struct NewRegistration {
pub namespace: Namespace,
pub record: PeerRecord,
pub ttl: Option<u64>,
}
impl NewRegistration {
pub fn new(namespace: Namespace, record: PeerRecord, ttl: Option<Ttl>) -> Self {
Self {
namespace,
record,
ttl,
}
}
pub fn effective_ttl(&self) -> Ttl {
self.ttl.unwrap_or(DEFAULT_TTL)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Registration {
pub namespace: Namespace,
pub record: PeerRecord,
pub ttl: Ttl,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ErrorCode {
InvalidNamespace,
InvalidSignedPeerRecord,
InvalidTtl,
InvalidCookie,
NotAuthorized,
InternalError,
Unavailable,
}
pub struct RendezvousCodec {
/// Codec to encode/decode the Unsigned varint length prefix of the frames.
length_codec: UviBytes,
}
impl Default for RendezvousCodec {
fn default() -> Self {
let mut length_codec = UviBytes::default();
length_codec.set_max_len(1024 * 1024); // 1MB
Self { length_codec }
}
}
impl Encoder for RendezvousCodec {
type Item = Message;
type Error = Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
use prost::Message;
let message = wire::Message::from(item);
let mut buf = Vec::with_capacity(message.encoded_len());
message
.encode(&mut buf)
.expect("Buffer has sufficient capacity");
// Length prefix the protobuf message, ensuring the max limit is not hit
self.length_codec.encode(Bytes::from(buf), dst)?;
Ok(())
}
}
impl Decoder for RendezvousCodec {
type Item = Message;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
use prost::Message;
let message = match self.length_codec.decode(src)? {
Some(p) => p,
None => return Ok(None),
};
let message = wire::Message::decode(message)?;
Ok(Some(message.try_into()?))
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Failed to encode message as bytes")]
Encode(#[from] prost::EncodeError),
#[error("Failed to decode message from bytes")]
Decode(#[from] prost::DecodeError),
#[error("Failed to read/write")]
Io(#[from] std::io::Error),
#[error("Failed to convert wire message to internal data model")]
ConversionError(#[from] ConversionError),
}
impl From<Message> for wire::Message {
fn from(message: Message) -> Self {
use wire::message::*;
match message {
Message::Register(NewRegistration {
namespace,
record,
ttl,
}) => wire::Message {
r#type: Some(MessageType::Register.into()),
register: Some(Register {
ns: Some(namespace.into()),
ttl,
signed_peer_record: Some(
record.into_signed_envelope().into_protobuf_encoding(),
),
}),
register_response: None,
unregister: None,
discover: None,
discover_response: None,
},
Message::RegisterResponse(Ok(ttl)) => wire::Message {
r#type: Some(MessageType::RegisterResponse.into()),
register_response: Some(RegisterResponse {
status: Some(ResponseStatus::Ok.into()),
status_text: None,
ttl: Some(ttl),
}),
register: None,
discover: None,
unregister: None,
discover_response: None,
},
Message::RegisterResponse(Err(error)) => wire::Message {
r#type: Some(MessageType::RegisterResponse.into()),
register_response: Some(RegisterResponse {
status: Some(ResponseStatus::from(error).into()),
status_text: None,
ttl: None,
}),
register: None,
discover: None,
unregister: None,
discover_response: None,
},
Message::Unregister(namespace) => wire::Message {
r#type: Some(MessageType::Unregister.into()),
unregister: Some(Unregister {
ns: Some(namespace.into()),
id: None,
}),
register: None,
register_response: None,
discover: None,
discover_response: None,
},
Message::Discover {
namespace,
cookie,
limit,
} => wire::Message {
r#type: Some(MessageType::Discover.into()),
discover: Some(Discover {
ns: namespace.map(|ns| ns.into()),
cookie: cookie.map(|cookie| cookie.into_wire_encoding()),
limit,
}),
register: None,
register_response: None,
unregister: None,
discover_response: None,
},
Message::DiscoverResponse(Ok((registrations, cookie))) => wire::Message {
r#type: Some(MessageType::DiscoverResponse.into()),
discover_response: Some(DiscoverResponse {
registrations: registrations
.into_iter()
.map(|reggo| Register {
ns: Some(reggo.namespace.into()),
ttl: Some(reggo.ttl),
signed_peer_record: Some(
reggo.record.into_signed_envelope().into_protobuf_encoding(),
),
})
.collect(),
status: Some(ResponseStatus::Ok.into()),
status_text: None,
cookie: Some(cookie.into_wire_encoding()),
}),
register: None,
discover: None,
unregister: None,
register_response: None,
},
Message::DiscoverResponse(Err(error)) => wire::Message {
r#type: Some(MessageType::DiscoverResponse.into()),
discover_response: Some(DiscoverResponse {
registrations: Vec::new(),
status: Some(ResponseStatus::from(error).into()),
status_text: None,
cookie: None,
}),
register: None,
discover: None,
unregister: None,
register_response: None,
},
}
}
}
impl TryFrom<wire::Message> for Message {
type Error = ConversionError;
fn try_from(message: wire::Message) -> Result<Self, Self::Error> {
use wire::message::*;
let message = match message {
wire::Message {
r#type: Some(0),
register:
Some(Register {
ns,
ttl,
signed_peer_record: Some(signed_peer_record),
}),
..
} => Message::Register(NewRegistration {
namespace: ns
.map(Namespace::new)
.transpose()?
.ok_or(ConversionError::MissingNamespace)?,
ttl,
record: PeerRecord::from_signed_envelope(SignedEnvelope::from_protobuf_encoding(
&signed_peer_record,
)?)?,
}),
wire::Message {
r#type: Some(1),
register_response:
Some(RegisterResponse {
status: Some(0),
ttl,
..
}),
..
} => Message::RegisterResponse(Ok(ttl.ok_or(ConversionError::MissingTtl)?)),
wire::Message {
r#type: Some(3),
discover: Some(Discover { ns, limit, cookie }),
..
} => Message::Discover {
namespace: ns.map(Namespace::new).transpose()?,
cookie: cookie.map(Cookie::from_wire_encoding).transpose()?,
limit,
},
wire::Message {
r#type: Some(4),
discover_response:
Some(DiscoverResponse {
registrations,
status: Some(0),
cookie: Some(cookie),
..
}),
..
} => {
let registrations = registrations
.into_iter()
.map(|reggo| {
Ok(Registration {
namespace: reggo
.ns
.map(Namespace::new)
.transpose()?
.ok_or(ConversionError::MissingNamespace)?,
record: PeerRecord::from_signed_envelope(
SignedEnvelope::from_protobuf_encoding(
&reggo
.signed_peer_record
.ok_or(ConversionError::MissingSignedPeerRecord)?,
)?,
)?,
ttl: reggo.ttl.ok_or(ConversionError::MissingTtl)?,
})
})
.collect::<Result<Vec<_>, ConversionError>>()?;
let cookie = Cookie::from_wire_encoding(cookie)?;
Message::DiscoverResponse(Ok((registrations, cookie)))
}
wire::Message {
r#type: Some(1),
register_response:
Some(RegisterResponse {
status: Some(error_code),
..
}),
..
} => {
let error_code = wire::message::ResponseStatus::from_i32(error_code)
.ok_or(ConversionError::BadStatusCode)?
.try_into()?;
Message::RegisterResponse(Err(error_code))
}
wire::Message {
r#type: Some(2),
unregister: Some(Unregister { ns, .. }),
..
} => Message::Unregister(
ns.map(Namespace::new)
.transpose()?
.ok_or(ConversionError::MissingNamespace)?,
),
wire::Message {
r#type: Some(4),
discover_response:
Some(DiscoverResponse {
status: Some(error_code),
..
}),
..
} => {
let error = wire::message::ResponseStatus::from_i32(error_code)
.ok_or(ConversionError::BadStatusCode)?
.try_into()?;
Message::DiscoverResponse(Err(error))
}
_ => return Err(ConversionError::InconsistentWireMessage),
};
Ok(message)
}
}
#[derive(Debug, thiserror::Error)]
pub enum ConversionError {
#[error("The wire message is consistent")]
InconsistentWireMessage,
#[error("Missing namespace field")]
MissingNamespace,
#[error("Invalid namespace")]
InvalidNamespace(#[from] NamespaceTooLong),
#[error("Missing signed peer record field")]
MissingSignedPeerRecord,
#[error("Missing TTL field")]
MissingTtl,
#[error("Bad status code")]
BadStatusCode,
#[error("Failed to decode signed envelope")]
BadSignedEnvelope(#[from] signed_envelope::DecodingError),
#[error("Failed to decode envelope as signed peer record")]
BadSignedPeerRecord(#[from] peer_record::FromEnvelopeError),
#[error(transparent)]
BadCookie(#[from] InvalidCookie),
#[error("The requested PoW difficulty is out of range")]
PoWDifficultyOutOfRange,
#[error("The provided PoW hash is not 32 bytes long")]
BadPoWHash,
}
impl ConversionError {
pub fn to_error_code(&self) -> ErrorCode {
match self {
ConversionError::MissingNamespace => ErrorCode::InvalidNamespace,
ConversionError::MissingSignedPeerRecord => ErrorCode::InvalidSignedPeerRecord,
ConversionError::BadSignedEnvelope(_) => ErrorCode::InvalidSignedPeerRecord,
ConversionError::BadSignedPeerRecord(_) => ErrorCode::InvalidSignedPeerRecord,
ConversionError::BadCookie(_) => ErrorCode::InvalidCookie,
ConversionError::MissingTtl => ErrorCode::InvalidTtl,
ConversionError::InconsistentWireMessage => ErrorCode::InternalError,
ConversionError::BadStatusCode => ErrorCode::InternalError,
ConversionError::PoWDifficultyOutOfRange => ErrorCode::InternalError,
ConversionError::BadPoWHash => ErrorCode::InternalError,
ConversionError::InvalidNamespace(_) => ErrorCode::InvalidNamespace,
}
}
}
impl TryFrom<wire::message::ResponseStatus> for ErrorCode {
type Error = UnmappableStatusCode;
fn try_from(value: wire::message::ResponseStatus) -> Result<Self, Self::Error> {
use wire::message::ResponseStatus::*;
let code = match value {
Ok => return Err(UnmappableStatusCode(value)),
EInvalidNamespace => ErrorCode::InvalidNamespace,
EInvalidSignedPeerRecord => ErrorCode::InvalidSignedPeerRecord,
EInvalidTtl => ErrorCode::InvalidTtl,
EInvalidCookie => ErrorCode::InvalidCookie,
ENotAuthorized => ErrorCode::NotAuthorized,
EInternalError => ErrorCode::InternalError,
EUnavailable => ErrorCode::Unavailable,
};
Result::Ok(code)
}
}
impl From<ErrorCode> for wire::message::ResponseStatus {
fn from(error_code: ErrorCode) -> Self {
use wire::message::ResponseStatus::*;
match error_code {
ErrorCode::InvalidNamespace => EInvalidNamespace,
ErrorCode::InvalidSignedPeerRecord => EInvalidSignedPeerRecord,
ErrorCode::InvalidTtl => EInvalidTtl,
ErrorCode::InvalidCookie => EInvalidCookie,
ErrorCode::NotAuthorized => ENotAuthorized,
ErrorCode::InternalError => EInternalError,
ErrorCode::Unavailable => EUnavailable,
}
}
}
impl From<UnmappableStatusCode> for ConversionError {
fn from(_: UnmappableStatusCode) -> Self {
ConversionError::InconsistentWireMessage
}
}
#[derive(Debug, thiserror::Error)]
#[error("The response code ({0:?}) cannot be mapped to our ErrorCode enum")]
pub struct UnmappableStatusCode(wire::message::ResponseStatus);
mod wire {
include!(concat!(env!("OUT_DIR"), "/rendezvous.pb.rs"));
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cookie_wire_encoding_roundtrip() {
let cookie = Cookie::for_namespace(Namespace::from_static("foo"));
let bytes = cookie.clone().into_wire_encoding();
let parsed = Cookie::from_wire_encoding(bytes).unwrap();
assert_eq!(parsed, cookie);
}
#[test]
fn cookie_wire_encoding_length() {
let cookie = Cookie::for_namespace(Namespace::from_static("foo"));
let bytes = cookie.into_wire_encoding();
assert_eq!(bytes.len(), 8 + 3)
}
}

View File

@ -0,0 +1,48 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec;
use crate::codec::Message;
use void::Void;
const PROTOCOL_IDENT: &[u8] = b"/rendezvous/1.0.0";
pub mod inbound;
pub mod outbound;
/// Errors that can occur while interacting with a substream.
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Reading message {0:?} at this stage is a protocol violation")]
BadMessage(Message),
#[error("Failed to write message to substream")]
WriteMessage(#[source] codec::Error),
#[error("Failed to read message from substream")]
ReadMessage(#[source] codec::Error),
#[error("Substream ended unexpectedly mid-protocol")]
UnexpectedEndOfStream,
}
pub type OutboundInEvent = crate::substream_handler::InEvent<outbound::OpenInfo, Void, Void>;
pub type OutboundOutEvent =
crate::substream_handler::OutEvent<Void, outbound::OutEvent, Void, Error>;
pub type InboundInEvent = crate::substream_handler::InEvent<(), inbound::InEvent, Void>;
pub type InboundOutEvent = crate::substream_handler::OutEvent<inbound::OutEvent, Void, Error, Void>;

View File

@ -0,0 +1,189 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{
Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, RendezvousCodec, Ttl,
};
use crate::handler::Error;
use crate::handler::PROTOCOL_IDENT;
use crate::substream_handler::{Next, PassthroughProtocol, SubstreamHandler};
use asynchronous_codec::Framed;
use futures::{SinkExt, StreamExt};
use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol};
use std::fmt;
use std::task::{Context, Poll};
/// The state of an inbound substream (i.e. the remote node opened it).
#[allow(clippy::large_enum_variant)]
pub enum Stream {
/// We are in the process of reading a message from the substream.
PendingRead(Framed<NegotiatedSubstream, RendezvousCodec>),
/// We read a message, dispatched it to the behaviour and are waiting for the response.
PendingBehaviour(Framed<NegotiatedSubstream, RendezvousCodec>),
/// We are in the process of sending a response.
PendingSend(Framed<NegotiatedSubstream, RendezvousCodec>, Message),
/// We've sent the message and are now closing down the substream.
PendingClose(Framed<NegotiatedSubstream, RendezvousCodec>),
}
impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Stream::PendingRead(_) => write!(f, "Inbound::PendingRead"),
Stream::PendingBehaviour(_) => write!(f, "Inbound::PendingBehaviour"),
Stream::PendingSend(_, _) => write!(f, "Inbound::PendingSend"),
Stream::PendingClose(_) => write!(f, "Inbound::PendingClose"),
}
}
}
#[derive(Debug, Clone)]
pub enum OutEvent {
RegistrationRequested(NewRegistration),
UnregisterRequested(Namespace),
DiscoverRequested {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<u64>,
},
}
#[derive(Debug)]
pub enum InEvent {
RegisterResponse {
ttl: Ttl,
},
DeclineRegisterRequest(ErrorCode),
DiscoverResponse {
discovered: Vec<Registration>,
cookie: Cookie,
},
DeclineDiscoverRequest(ErrorCode),
}
impl SubstreamHandler for Stream {
type InEvent = InEvent;
type OutEvent = OutEvent;
type Error = Error;
type OpenInfo = ();
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info)
}
fn new(substream: NegotiatedSubstream, _: Self::OpenInfo) -> Self {
Stream::PendingRead(Framed::new(substream, RendezvousCodec::default()))
}
fn inject_event(self, event: Self::InEvent) -> Self {
match (event, self) {
(InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl)))
}
(InEvent::DeclineRegisterRequest(error), Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::RegisterResponse(Err(error)))
}
(
InEvent::DiscoverResponse { discovered, cookie },
Stream::PendingBehaviour(substream),
) => Stream::PendingSend(
substream,
Message::DiscoverResponse(Ok((discovered, cookie))),
),
(InEvent::DeclineDiscoverRequest(error), Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::DiscoverResponse(Err(error)))
}
(event, inbound) => {
debug_assert!(false, "{:?} cannot handle event {:?}", inbound, event);
inbound
}
}
}
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
let next_state = match self {
Stream::PendingRead(mut substream) => {
match substream.poll_next_unpin(cx).map_err(Error::ReadMessage)? {
Poll::Ready(Some(msg)) => {
let event = match msg {
Message::Register(registration) => {
OutEvent::RegistrationRequested(registration)
}
Message::Discover {
cookie,
namespace,
limit,
} => OutEvent::DiscoverRequested {
cookie,
namespace,
limit,
},
Message::Unregister(namespace) => {
OutEvent::UnregisterRequested(namespace)
}
other => return Err(Error::BadMessage(other)),
};
Next::EmitEvent {
event,
next_state: Stream::PendingBehaviour(substream),
}
}
Poll::Ready(None) => return Err(Error::UnexpectedEndOfStream),
Poll::Pending => Next::Pending {
next_state: Stream::PendingRead(substream),
},
}
}
Stream::PendingBehaviour(substream) => Next::Pending {
next_state: Stream::PendingBehaviour(substream),
},
Stream::PendingSend(mut substream, message) => match substream
.poll_ready_unpin(cx)
.map_err(Error::WriteMessage)?
{
Poll::Ready(()) => {
substream
.start_send_unpin(message)
.map_err(Error::WriteMessage)?;
Next::Continue {
next_state: Stream::PendingClose(substream),
}
}
Poll::Pending => Next::Pending {
next_state: Stream::PendingSend(substream, message),
},
},
Stream::PendingClose(mut substream) => match substream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) => Next::Done,
Poll::Ready(Err(_)) => Next::Done, // there is nothing we can do about an error during close
Poll::Pending => Next::Pending {
next_state: Stream::PendingClose(substream),
},
},
};
Ok(next_state)
}
}

View File

@ -0,0 +1,132 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{Cookie, Message, NewRegistration, RendezvousCodec};
use crate::handler::Error;
use crate::handler::PROTOCOL_IDENT;
use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, SubstreamHandler};
use crate::{ErrorCode, Namespace, Registration, Ttl};
use asynchronous_codec::Framed;
use futures::{SinkExt, TryFutureExt, TryStreamExt};
use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol};
use std::task::Context;
use void::Void;
pub struct Stream(FutureSubstream<OutEvent, Error>);
impl SubstreamHandler for Stream {
type InEvent = Void;
type OutEvent = OutEvent;
type Error = Error;
type OpenInfo = OpenInfo;
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info)
}
fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self {
let mut stream = Framed::new(substream, RendezvousCodec::default());
let sent_message = match info {
OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration),
OpenInfo::UnregisterRequest(namespace) => Message::Unregister(namespace),
OpenInfo::DiscoverRequest {
namespace,
cookie,
limit,
} => Message::Discover {
namespace,
cookie,
limit,
},
};
Self(FutureSubstream::new(async move {
use Message::*;
use OutEvent::*;
stream
.send(sent_message.clone())
.map_err(Error::WriteMessage)
.await?;
let received_message = stream.try_next().map_err(Error::ReadMessage).await?;
let received_message = received_message.ok_or(Error::UnexpectedEndOfStream)?;
let event = match (sent_message, received_message) {
(Register(registration), RegisterResponse(Ok(ttl))) => Registered {
namespace: registration.namespace,
ttl,
},
(Register(registration), RegisterResponse(Err(error))) => {
RegisterFailed(registration.namespace, error)
}
(Discover { .. }, DiscoverResponse(Ok((registrations, cookie)))) => Discovered {
registrations,
cookie,
},
(Discover { namespace, .. }, DiscoverResponse(Err(error))) => {
DiscoverFailed { namespace, error }
}
(.., other) => return Err(Error::BadMessage(other)),
};
stream.close().map_err(Error::WriteMessage).await?;
Ok(event)
}))
}
fn inject_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
Ok(self.0.advance(cx)?.map_state(Stream))
}
}
#[derive(Debug, Clone)]
pub enum OutEvent {
Registered {
namespace: Namespace,
ttl: Ttl,
},
RegisterFailed(Namespace, ErrorCode),
Discovered {
registrations: Vec<Registration>,
cookie: Cookie,
},
DiscoverFailed {
namespace: Option<Namespace>,
error: ErrorCode,
},
}
#[derive(Debug)]
pub enum OpenInfo {
RegisterRequest(NewRegistration),
UnregisterRequest(Namespace),
DiscoverRequest {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<Ttl>,
},
}

View File

@ -0,0 +1,43 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
pub use self::codec::{ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl};
mod codec;
mod handler;
mod substream_handler;
/// If unspecified, rendezvous nodes should assume a TTL of 2h.
///
/// See <https://github.com/libp2p/specs/blob/d21418638d5f09f2a4e5a1ceca17058df134a300/rendezvous/README.md#L116-L117>.
pub const DEFAULT_TTL: Ttl = 60 * 60 * 2;
/// By default, nodes should require a minimum TTL of 2h
///
/// <https://github.com/libp2p/specs/tree/master/rendezvous#recommendations-for-rendezvous-points-configurations>.
pub const MIN_TTL: Ttl = 60 * 60 * 2;
/// By default, nodes should allow a maximum TTL of 72h
///
/// <https://github.com/libp2p/specs/tree/master/rendezvous#recommendations-for-rendezvous-points-configurations>.
pub const MAX_TTL: Ttl = 60 * 60 * 72;
pub mod client;
pub mod server;

View File

@ -0,0 +1,61 @@
syntax = "proto2";
package rendezvous.pb;
message Message {
enum MessageType {
REGISTER = 0;
REGISTER_RESPONSE = 1;
UNREGISTER = 2;
DISCOVER = 3;
DISCOVER_RESPONSE = 4;
}
enum ResponseStatus {
OK = 0;
E_INVALID_NAMESPACE = 100;
E_INVALID_SIGNED_PEER_RECORD = 101;
E_INVALID_TTL = 102;
E_INVALID_COOKIE = 103;
E_NOT_AUTHORIZED = 200;
E_INTERNAL_ERROR = 300;
E_UNAVAILABLE = 400;
}
message Register {
optional string ns = 1;
optional bytes signedPeerRecord = 2;
optional uint64 ttl = 3; // in seconds
}
message RegisterResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional uint64 ttl = 3; // in seconds
}
message Unregister {
optional string ns = 1;
optional bytes id = 2;
}
message Discover {
optional string ns = 1;
optional uint64 limit = 2;
optional bytes cookie = 3;
}
message DiscoverResponse {
repeated Register registrations = 1;
optional bytes cookie = 2;
optional ResponseStatus status = 3;
optional string statusText = 4;
}
optional MessageType type = 1;
optional Register register = 2;
optional RegisterResponse registerResponse = 3;
optional Unregister unregister = 4;
optional Discover discover = 5;
optional DiscoverResponse discoverResponse = 6;
}

View File

@ -0,0 +1,764 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl};
use crate::handler::inbound;
use crate::substream_handler::{InboundSubstreamId, SubstreamProtocolsHandler};
use crate::{handler, MAX_TTL, MIN_TTL};
use bimap::BiMap;
use futures::future::BoxFuture;
use futures::ready;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use libp2p_core::connection::ConnectionId;
use libp2p_core::PeerId;
use libp2p_swarm::{
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::iter::FromIterator;
use std::task::{Context, Poll};
use std::time::Duration;
use void::Void;
pub struct Behaviour {
events: VecDeque<
NetworkBehaviourAction<Event, SubstreamProtocolsHandler<inbound::Stream, Void, ()>>,
>,
registrations: Registrations,
}
pub struct Config {
min_ttl: Ttl,
max_ttl: Ttl,
}
impl Config {
pub fn with_min_ttl(mut self, min_ttl: Ttl) -> Self {
self.min_ttl = min_ttl;
self
}
pub fn with_max_ttl(mut self, max_ttl: Ttl) -> Self {
self.max_ttl = max_ttl;
self
}
}
impl Default for Config {
fn default() -> Self {
Self {
min_ttl: MIN_TTL,
max_ttl: MAX_TTL,
}
}
}
impl Behaviour {
/// Create a new instance of the rendezvous [`NetworkBehaviour`].
pub fn new(config: Config) -> Self {
Self {
events: Default::default(),
registrations: Registrations::with_config(config),
}
}
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
/// We successfully served a discover request from a peer.
DiscoverServed {
enquirer: PeerId,
registrations: Vec<Registration>,
},
/// We failed to serve a discover request for a peer.
DiscoverNotServed { enquirer: PeerId, error: ErrorCode },
/// A peer successfully registered with us.
PeerRegistered {
peer: PeerId,
registration: Registration,
},
/// We declined a registration from a peer.
PeerNotRegistered {
peer: PeerId,
namespace: Namespace,
error: ErrorCode,
},
/// A peer successfully unregistered with us.
PeerUnregistered { peer: PeerId, namespace: Namespace },
/// A registration from a peer expired.
RegistrationExpired(Registration),
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler = SubstreamProtocolsHandler<inbound::Stream, Void, ()>;
type OutEvent = Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
let initial_keep_alive = Duration::from_secs(30);
SubstreamProtocolsHandler::new_inbound_only(initial_keep_alive)
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: handler::InboundOutEvent,
) {
let new_events = match event {
handler::InboundOutEvent::InboundEvent { id, message } => {
handle_inbound_event(message, peer_id, connection, id, &mut self.registrations)
}
handler::InboundOutEvent::OutboundEvent { message, .. } => void::unreachable(message),
handler::InboundOutEvent::InboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error);
vec![NetworkBehaviourAction::CloseConnection {
peer_id,
connection: CloseConnection::One(connection),
}]
}
handler::InboundOutEvent::OutboundError { error, .. } => void::unreachable(error),
};
self.events.extend(new_events);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
Event::RegistrationExpired(registration),
));
}
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Poll::Pending
}
}
fn handle_inbound_event(
event: inbound::OutEvent,
peer_id: PeerId,
connection: ConnectionId,
id: InboundSubstreamId,
registrations: &mut Registrations,
) -> Vec<NetworkBehaviourAction<Event, SubstreamProtocolsHandler<inbound::Stream, Void, ()>>> {
match event {
// bad registration
inbound::OutEvent::RegistrationRequested(registration)
if registration.record.peer_id() != peer_id =>
{
let error = ErrorCode::NotAuthorized;
vec![
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DeclineRegisterRequest(error),
},
},
NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered {
peer: peer_id,
namespace: registration.namespace,
error,
}),
]
}
inbound::OutEvent::RegistrationRequested(registration) => {
let namespace = registration.namespace.clone();
match registrations.add(registration) {
Ok(registration) => {
vec![
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::RegisterResponse {
ttl: registration.ttl,
},
},
},
NetworkBehaviourAction::GenerateEvent(Event::PeerRegistered {
peer: peer_id,
registration,
}),
]
}
Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => {
let error = ErrorCode::InvalidTtl;
vec![
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DeclineRegisterRequest(error),
},
},
NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered {
peer: peer_id,
namespace,
error,
}),
]
}
}
}
inbound::OutEvent::DiscoverRequested {
namespace,
cookie,
limit,
} => match registrations.get(namespace, cookie, limit) {
Ok((registrations, cookie)) => {
let discovered = registrations.cloned().collect::<Vec<_>>();
vec![
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DiscoverResponse {
discovered: discovered.clone(),
cookie,
},
},
},
NetworkBehaviourAction::GenerateEvent(Event::DiscoverServed {
enquirer: peer_id,
registrations: discovered,
}),
]
}
Err(_) => {
let error = ErrorCode::InvalidCookie;
vec![
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DeclineDiscoverRequest(error),
},
},
NetworkBehaviourAction::GenerateEvent(Event::DiscoverNotServed {
enquirer: peer_id,
error,
}),
]
}
},
inbound::OutEvent::UnregisterRequested(namespace) => {
registrations.remove(namespace.clone(), peer_id);
vec![NetworkBehaviourAction::GenerateEvent(
Event::PeerUnregistered {
peer: peer_id,
namespace,
},
)]
}
}
}
#[derive(Debug, Eq, PartialEq, Hash, Copy, Clone)]
struct RegistrationId(u64);
impl RegistrationId {
fn new() -> Self {
Self(rand::random())
}
}
#[derive(Debug, PartialEq)]
struct ExpiredRegistration(Registration);
pub struct Registrations {
registrations_for_peer: BiMap<(PeerId, Namespace), RegistrationId>,
registrations: HashMap<RegistrationId, Registration>,
cookies: HashMap<Cookie, HashSet<RegistrationId>>,
min_ttl: Ttl,
max_ttl: Ttl,
next_expiry: FuturesUnordered<BoxFuture<'static, RegistrationId>>,
}
#[derive(Debug, thiserror::Error)]
pub enum TtlOutOfRange {
#[error("Requested TTL ({requested}s) is too long; max {bound}s")]
TooLong { bound: Ttl, requested: Ttl },
#[error("Requested TTL ({requested}s) is too short; min {bound}s")]
TooShort { bound: Ttl, requested: Ttl },
}
impl Default for Registrations {
fn default() -> Self {
Registrations::with_config(Config::default())
}
}
impl Registrations {
pub fn with_config(config: Config) -> Self {
Self {
registrations_for_peer: Default::default(),
registrations: Default::default(),
min_ttl: config.min_ttl,
max_ttl: config.max_ttl,
cookies: Default::default(),
next_expiry: FuturesUnordered::from_iter(vec![futures::future::pending().boxed()]),
}
}
pub fn add(
&mut self,
new_registration: NewRegistration,
) -> Result<Registration, TtlOutOfRange> {
let ttl = new_registration.effective_ttl();
if ttl > self.max_ttl {
return Err(TtlOutOfRange::TooLong {
bound: self.max_ttl,
requested: ttl,
});
}
if ttl < self.min_ttl {
return Err(TtlOutOfRange::TooShort {
bound: self.min_ttl,
requested: ttl,
});
}
let namespace = new_registration.namespace;
let registration_id = RegistrationId::new();
if let Some(old_registration) = self
.registrations_for_peer
.get_by_left(&(new_registration.record.peer_id(), namespace.clone()))
{
self.registrations.remove(old_registration);
}
self.registrations_for_peer.insert(
(new_registration.record.peer_id(), namespace.clone()),
registration_id,
);
let registration = Registration {
namespace,
record: new_registration.record,
ttl,
};
self.registrations
.insert(registration_id, registration.clone());
let next_expiry = wasm_timer::Delay::new(Duration::from_secs(ttl as u64))
.map(move |result| {
if result.is_err() {
log::warn!("Timer for registration {} has unexpectedly errored, treating it as expired", registration_id.0);
}
registration_id
})
.boxed();
self.next_expiry.push(next_expiry);
Ok(registration)
}
pub fn remove(&mut self, namespace: Namespace, peer_id: PeerId) {
let reggo_to_remove = self
.registrations_for_peer
.remove_by_left(&(peer_id, namespace));
if let Some((_, reggo_to_remove)) = reggo_to_remove {
self.registrations.remove(&reggo_to_remove);
}
}
pub fn get(
&mut self,
discover_namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<u64>,
) -> Result<(impl Iterator<Item = &Registration> + '_, Cookie), CookieNamespaceMismatch> {
let cookie_namespace = cookie.as_ref().and_then(|cookie| cookie.namespace());
match (discover_namespace.as_ref(), cookie_namespace) {
// discover all namespace but cookie is specific to a namespace? => bad
(None, Some(_)) => return Err(CookieNamespaceMismatch),
// discover for a namespace but cookie is for a different namesapce? => bad
(Some(namespace), Some(cookie_namespace)) if namespace != cookie_namespace => {
return Err(CookieNamespaceMismatch)
}
// every other combination is fine
_ => {}
}
let mut reggos_of_last_discover = cookie
.and_then(|cookie| self.cookies.get(&cookie))
.cloned()
.unwrap_or_default();
let ids = self
.registrations_for_peer
.iter()
.filter_map({
|((_, namespace), registration_id)| {
if reggos_of_last_discover.contains(registration_id) {
return None;
}
match discover_namespace.as_ref() {
Some(discover_namespace) if discover_namespace == namespace => {
Some(registration_id)
}
Some(_) => None,
None => Some(registration_id),
}
}
})
.take(limit.unwrap_or(u64::MAX) as usize)
.cloned()
.collect::<Vec<_>>();
reggos_of_last_discover.extend(&ids);
let new_cookie = discover_namespace
.map(Cookie::for_namespace)
.unwrap_or_else(Cookie::for_all_namespaces);
self.cookies
.insert(new_cookie.clone(), reggos_of_last_discover);
let reggos = &self.registrations;
let registrations = ids
.into_iter()
.map(move |id| reggos.get(&id).expect("bad internal datastructure"));
Ok((registrations, new_cookie))
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ExpiredRegistration> {
let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect(
"This stream should never finish because it is initialised with a pending future",
);
// clean up our cookies
self.cookies.retain(|_, registrations| {
registrations.remove(&expired_registration);
// retain all cookies where there are still registrations left
!registrations.is_empty()
});
self.registrations_for_peer
.remove_by_right(&expired_registration);
match self.registrations.remove(&expired_registration) {
None => self.poll(cx),
Some(registration) => Poll::Ready(ExpiredRegistration(registration)),
}
}
}
#[derive(Debug, thiserror::Error, Eq, PartialEq)]
#[error("The provided cookie is not valid for a DISCOVER request for the given namespace")]
pub struct CookieNamespaceMismatch;
#[cfg(test)]
mod tests {
use std::option::Option::None;
use std::time::SystemTime;
use libp2p_core::{identity, PeerRecord};
use super::*;
#[test]
fn given_cookie_from_discover_when_discover_again_then_only_get_diff() {
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("foo")).unwrap();
let (initial_discover, cookie) = registrations.get(None, None, None).unwrap();
assert_eq!(initial_discover.count(), 2);
let (subsequent_discover, _) = registrations.get(None, Some(cookie), None).unwrap();
assert_eq!(subsequent_discover.count(), 0);
}
#[test]
fn given_registrations_when_discover_all_then_all_are_returned() {
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("foo")).unwrap();
let (discover, _) = registrations.get(None, None, None).unwrap();
assert_eq!(discover.count(), 2);
}
#[test]
fn given_registrations_when_discover_only_for_specific_namespace_then_only_those_are_returned()
{
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("bar")).unwrap();
let (discover, _) = registrations
.get(Some(Namespace::from_static("foo")), None, None)
.unwrap();
assert_eq!(
discover.map(|r| &r.namespace).collect::<Vec<_>>(),
vec!["foo"]
);
}
#[test]
fn given_reregistration_old_registration_is_discarded() {
let alice = identity::Keypair::generate_ed25519();
let mut registrations = Registrations::default();
registrations
.add(new_registration("foo", alice.clone(), None))
.unwrap();
registrations
.add(new_registration("foo", alice, None))
.unwrap();
let (discover, _) = registrations
.get(Some(Namespace::from_static("foo")), None, None)
.unwrap();
assert_eq!(
discover.map(|r| &r.namespace).collect::<Vec<_>>(),
vec!["foo"]
);
}
#[test]
fn given_cookie_from_2nd_discover_does_not_return_nodes_from_first_discover() {
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("foo")).unwrap();
let (initial_discover, cookie1) = registrations.get(None, None, None).unwrap();
assert_eq!(initial_discover.count(), 2);
let (subsequent_discover, cookie2) = registrations.get(None, Some(cookie1), None).unwrap();
assert_eq!(subsequent_discover.count(), 0);
let (subsequent_discover, _) = registrations.get(None, Some(cookie2), None).unwrap();
assert_eq!(subsequent_discover.count(), 0);
}
#[test]
fn cookie_from_different_discover_request_is_not_valid() {
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("bar")).unwrap();
let (_, foo_discover_cookie) = registrations
.get(Some(Namespace::from_static("foo")), None, None)
.unwrap();
let result = registrations.get(
Some(Namespace::from_static("bar")),
Some(foo_discover_cookie),
None,
);
assert!(matches!(result, Err(CookieNamespaceMismatch)))
}
#[tokio::test]
async fn given_two_registration_ttls_one_expires_one_lives() {
let mut registrations = Registrations::with_config(Config {
min_ttl: 0,
max_ttl: 4,
});
let start_time = SystemTime::now();
registrations
.add(new_dummy_registration_with_ttl("foo", 1))
.unwrap();
registrations
.add(new_dummy_registration_with_ttl("bar", 4))
.unwrap();
let event = registrations.next_event().await;
let elapsed = start_time.elapsed().unwrap();
assert!(elapsed.as_secs() >= 1);
assert!(elapsed.as_secs() < 2);
assert_eq!(event.0.namespace, Namespace::from_static("foo"));
{
let (mut discovered_foo, _) = registrations
.get(Some(Namespace::from_static("foo")), None, None)
.unwrap();
assert!(discovered_foo.next().is_none());
}
let (mut discovered_bar, _) = registrations
.get(Some(Namespace::from_static("bar")), None, None)
.unwrap();
assert!(discovered_bar.next().is_some());
}
#[tokio::test]
async fn given_peer_unregisters_before_expiry_do_not_emit_registration_expired() {
let mut registrations = Registrations::with_config(Config {
min_ttl: 1,
max_ttl: 10,
});
let dummy_registration = new_dummy_registration_with_ttl("foo", 2);
let namespace = dummy_registration.namespace.clone();
let peer_id = dummy_registration.record.peer_id();
registrations.add(dummy_registration).unwrap();
registrations.no_event_for(1).await;
registrations.remove(namespace, peer_id);
registrations.no_event_for(3).await
}
/// FuturesUnordered stop polling for ready futures when poll_next() is called until a None
/// value is returned. To prevent the next_expiry future from going to "sleep", next_expiry
/// is initialised with a future that always returns pending. This test ensures that
/// FuturesUnordered does not stop polling for ready futures.
#[tokio::test]
async fn given_all_registrations_expired_then_successfully_handle_new_registration_and_expiry()
{
let mut registrations = Registrations::with_config(Config {
min_ttl: 0,
max_ttl: 10,
});
let dummy_registration = new_dummy_registration_with_ttl("foo", 1);
registrations.add(dummy_registration.clone()).unwrap();
let _ = registrations.next_event_in_at_most(2).await;
registrations.no_event_for(1).await;
registrations.add(dummy_registration).unwrap();
let _ = registrations.next_event_in_at_most(2).await;
}
#[tokio::test]
async fn cookies_are_cleaned_up_if_registrations_expire() {
let mut registrations = Registrations::with_config(Config {
min_ttl: 1,
max_ttl: 10,
});
registrations
.add(new_dummy_registration_with_ttl("foo", 2))
.unwrap();
let (_, _) = registrations.get(None, None, None).unwrap();
assert_eq!(registrations.cookies.len(), 1);
let _ = registrations.next_event_in_at_most(3).await;
assert_eq!(registrations.cookies.len(), 0);
}
#[test]
fn given_limit_discover_only_returns_n_results() {
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("foo")).unwrap();
let (registrations, _) = registrations.get(None, None, Some(1)).unwrap();
assert_eq!(registrations.count(), 1);
}
#[test]
fn given_limit_cookie_can_be_used_for_pagination() {
let mut registrations = Registrations::default();
registrations.add(new_dummy_registration("foo")).unwrap();
registrations.add(new_dummy_registration("foo")).unwrap();
let (discover1, cookie) = registrations.get(None, None, Some(1)).unwrap();
assert_eq!(discover1.count(), 1);
let (discover2, _) = registrations.get(None, Some(cookie), None).unwrap();
assert_eq!(discover2.count(), 1);
}
fn new_dummy_registration(namespace: &'static str) -> NewRegistration {
let identity = identity::Keypair::generate_ed25519();
new_registration(namespace, identity, None)
}
fn new_dummy_registration_with_ttl(namespace: &'static str, ttl: Ttl) -> NewRegistration {
let identity = identity::Keypair::generate_ed25519();
new_registration(namespace, identity, Some(ttl))
}
fn new_registration(
namespace: &'static str,
identity: identity::Keypair,
ttl: Option<Ttl>,
) -> NewRegistration {
NewRegistration::new(
Namespace::from_static(namespace),
PeerRecord::new(identity, vec!["/ip4/127.0.0.1/tcp/1234".parse().unwrap()]).unwrap(),
ttl,
)
}
/// Defines utility functions that make the tests more readable.
impl Registrations {
async fn next_event(&mut self) -> ExpiredRegistration {
futures::future::poll_fn(|cx| self.poll(cx)).await
}
/// Polls [`Registrations`] for `seconds` and panics if it returns a event during this time.
async fn no_event_for(&mut self, seconds: u64) {
tokio::time::timeout(Duration::from_secs(seconds), self.next_event())
.await
.unwrap_err();
}
/// Polls [`Registrations`] for at most `seconds` and panics if doesn't return an event within that time.
async fn next_event_in_at_most(&mut self, seconds: u64) -> ExpiredRegistration {
tokio::time::timeout(Duration::from_secs(seconds), self.next_event())
.await
.unwrap()
}
}
}

View File

@ -0,0 +1,551 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A generic [`ProtocolsHandler`] that delegates the handling of substreams to [`SubstreamHandler`]s.
//!
//! This module is an attempt to simplify the implementation of protocols by freeing implementations from dealing with aspects such as concurrent substreams.
//! Particularly for outbound substreams, it greatly simplifies the definition of protocols through the [`FutureSubstream`] helper.
//!
//! At the moment, this module is an implementation detail of the rendezvous protocol but the intent is for it to be provided as a generic module that is accessible to other protocols as well.
use futures::future::{self, BoxFuture, Fuse, FusedFuture};
use futures::FutureExt;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend};
use libp2p_swarm::{
KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::future::Future;
use std::hash::Hash;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use void::Void;
/// Handles a substream throughout its lifetime.
pub trait SubstreamHandler: Sized {
type InEvent;
type OutEvent;
type Error;
type OpenInfo;
fn upgrade(open_info: Self::OpenInfo)
-> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo>;
fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self;
fn inject_event(self, event: Self::InEvent) -> Self;
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error>;
}
/// The result of advancing a [`SubstreamHandler`].
pub enum Next<TState, TEvent> {
/// Return the given event and set the handler into `next_state`.
EmitEvent { event: TEvent, next_state: TState },
/// The handler currently cannot do any more work, set its state back into `next_state`.
Pending { next_state: TState },
/// The handler performed some work and wants to continue in the given state.
///
/// This variant is useful because it frees the handler from implementing a loop internally.
Continue { next_state: TState },
/// The handler finished.
Done,
}
impl<TState, TEvent> Next<TState, TEvent> {
pub fn map_state<TNextState>(
self,
map: impl FnOnce(TState) -> TNextState,
) -> Next<TNextState, TEvent> {
match self {
Next::EmitEvent { event, next_state } => Next::EmitEvent {
event,
next_state: map(next_state),
},
Next::Pending { next_state } => Next::Pending {
next_state: map(next_state),
},
Next::Continue { next_state } => Next::Pending {
next_state: map(next_state),
},
Next::Done => Next::Done,
}
}
}
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)]
pub struct InboundSubstreamId(u64);
impl InboundSubstreamId {
fn fetch_and_increment(&mut self) -> Self {
let next_id = *self;
self.0 += 1;
next_id
}
}
impl fmt::Display for InboundSubstreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)]
pub struct OutboundSubstreamId(u64);
impl OutboundSubstreamId {
fn fetch_and_increment(&mut self) -> Self {
let next_id = *self;
self.0 += 1;
next_id
}
}
impl fmt::Display for OutboundSubstreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct PassthroughProtocol {
ident: Option<&'static [u8]>,
}
impl PassthroughProtocol {
pub fn new(ident: &'static [u8]) -> Self {
Self { ident: Some(ident) }
}
}
impl UpgradeInfo for PassthroughProtocol {
type Info = &'static [u8];
type InfoIter = std::option::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.ident.into_iter()
}
}
impl<C: Send + 'static> InboundUpgrade<C> for PassthroughProtocol {
type Output = C;
type Error = Void;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
match self.ident {
Some(_) => future::ready(Ok(socket)).boxed(),
None => future::pending().boxed(),
}
}
}
impl<C: Send + 'static> OutboundUpgrade<C> for PassthroughProtocol {
type Output = C;
type Error = Void;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
match self.ident {
Some(_) => future::ready(Ok(socket)).boxed(),
None => future::pending().boxed(),
}
}
}
/// An implementation of [`ProtocolsHandler`] that delegates to individual [`SubstreamHandler`]s.
pub struct SubstreamProtocolsHandler<TInboundSubstream, TOutboundSubstream, TOutboundOpenInfo> {
inbound_substreams: HashMap<InboundSubstreamId, TInboundSubstream>,
outbound_substreams: HashMap<OutboundSubstreamId, TOutboundSubstream>,
next_inbound_substream_id: InboundSubstreamId,
next_outbound_substream_id: OutboundSubstreamId,
new_substreams: VecDeque<TOutboundOpenInfo>,
initial_keep_alive_deadline: Instant,
}
impl<TInboundSubstream, TOutboundSubstream, TOutboundOpenInfo>
SubstreamProtocolsHandler<TInboundSubstream, TOutboundSubstream, TOutboundOpenInfo>
{
pub fn new(initial_keep_alive: Duration) -> Self {
Self {
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
next_inbound_substream_id: InboundSubstreamId(0),
next_outbound_substream_id: OutboundSubstreamId(0),
new_substreams: Default::default(),
initial_keep_alive_deadline: Instant::now() + initial_keep_alive,
}
}
}
impl<TOutboundSubstream, TOutboundOpenInfo>
SubstreamProtocolsHandler<void::Void, TOutboundSubstream, TOutboundOpenInfo>
{
pub fn new_outbound_only(initial_keep_alive: Duration) -> Self {
Self {
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
next_inbound_substream_id: InboundSubstreamId(0),
next_outbound_substream_id: OutboundSubstreamId(0),
new_substreams: Default::default(),
initial_keep_alive_deadline: Instant::now() + initial_keep_alive,
}
}
}
impl<TInboundSubstream, TOutboundOpenInfo>
SubstreamProtocolsHandler<TInboundSubstream, void::Void, TOutboundOpenInfo>
{
pub fn new_inbound_only(initial_keep_alive: Duration) -> Self {
Self {
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
next_inbound_substream_id: InboundSubstreamId(0),
next_outbound_substream_id: OutboundSubstreamId(0),
new_substreams: Default::default(),
initial_keep_alive_deadline: Instant::now() + initial_keep_alive,
}
}
}
/// Poll all substreams within the given HashMap.
///
/// This is defined as a separate function because we call it with two different fields stored within [`SubstreamProtocolsHandler`].
fn poll_substreams<TId, TSubstream, TError, TOutEvent>(
substreams: &mut HashMap<TId, TSubstream>,
cx: &mut Context<'_>,
) -> Poll<Result<(TId, TOutEvent), (TId, TError)>>
where
TSubstream: SubstreamHandler<OutEvent = TOutEvent, Error = TError>,
TId: Copy + Eq + Hash + fmt::Display,
{
let substream_ids = substreams.keys().copied().collect::<Vec<_>>();
'loop_substreams: for id in substream_ids {
let mut handler = substreams
.remove(&id)
.expect("we just got the key out of the map");
let (next_state, poll) = 'loop_handler: loop {
match handler.advance(cx) {
Ok(Next::EmitEvent { next_state, event }) => {
break (next_state, Poll::Ready(Ok((id, event))))
}
Ok(Next::Pending { next_state }) => break (next_state, Poll::Pending),
Ok(Next::Continue { next_state }) => {
handler = next_state;
continue 'loop_handler;
}
Ok(Next::Done) => {
log::debug!("Substream handler {} finished", id);
continue 'loop_substreams;
}
Err(e) => return Poll::Ready(Err((id, e))),
}
};
substreams.insert(id, next_state);
return poll;
}
Poll::Pending
}
/// Event sent from the [`libp2p_swarm::NetworkBehaviour`] to the [`SubstreamProtocolsHandler`].
#[derive(Debug)]
pub enum InEvent<I, TInboundEvent, TOutboundEvent> {
/// Open a new substream using the provided `open_info`.
///
/// For "client-server" protocols, this is typically the initial message to be sent to the other party.
NewSubstream { open_info: I },
NotifyInboundSubstream {
id: InboundSubstreamId,
message: TInboundEvent,
},
NotifyOutboundSubstream {
id: OutboundSubstreamId,
message: TOutboundEvent,
},
}
/// Event produced by the [`SubstreamProtocolsHandler`] for the corresponding [`libp2p_swarm::NetworkBehaviour`].
#[derive(Debug)]
pub enum OutEvent<TInbound, TOutbound, TInboundError, TOutboundError> {
/// An inbound substream produced an event.
InboundEvent {
id: InboundSubstreamId,
message: TInbound,
},
/// An outbound substream produced an event.
OutboundEvent {
id: OutboundSubstreamId,
message: TOutbound,
},
/// An inbound substream errored irrecoverably.
InboundError {
id: InboundSubstreamId,
error: TInboundError,
},
/// An outbound substream errored irrecoverably.
OutboundError {
id: OutboundSubstreamId,
error: TOutboundError,
},
}
impl<
TInboundInEvent,
TInboundOutEvent,
TOutboundInEvent,
TOutboundOutEvent,
TOutboundOpenInfo,
TInboundError,
TOutboundError,
TInboundSubstreamHandler,
TOutboundSubstreamHandler,
> ProtocolsHandler
for SubstreamProtocolsHandler<
TInboundSubstreamHandler,
TOutboundSubstreamHandler,
TOutboundOpenInfo,
>
where
TInboundSubstreamHandler: SubstreamHandler<
InEvent = TInboundInEvent,
OutEvent = TInboundOutEvent,
Error = TInboundError,
OpenInfo = (),
>,
TOutboundSubstreamHandler: SubstreamHandler<
InEvent = TOutboundInEvent,
OutEvent = TOutboundOutEvent,
Error = TOutboundError,
OpenInfo = TOutboundOpenInfo,
>,
TInboundInEvent: fmt::Debug + Send + 'static,
TInboundOutEvent: fmt::Debug + Send + 'static,
TOutboundInEvent: fmt::Debug + Send + 'static,
TOutboundOutEvent: fmt::Debug + Send + 'static,
TOutboundOpenInfo: fmt::Debug + Send + 'static,
TInboundError: fmt::Debug + Send + 'static,
TOutboundError: fmt::Debug + Send + 'static,
TInboundSubstreamHandler: Send + 'static,
TOutboundSubstreamHandler: Send + 'static,
{
type InEvent = InEvent<TOutboundOpenInfo, TInboundInEvent, TOutboundInEvent>;
type OutEvent = OutEvent<TInboundOutEvent, TOutboundOutEvent, TInboundError, TOutboundError>;
type Error = Void;
type InboundProtocol = PassthroughProtocol;
type OutboundProtocol = PassthroughProtocol;
type InboundOpenInfo = ();
type OutboundOpenInfo = TOutboundOpenInfo;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
TInboundSubstreamHandler::upgrade(())
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
_: Self::InboundOpenInfo,
) {
self.inbound_substreams.insert(
self.next_inbound_substream_id.fetch_and_increment(),
TInboundSubstreamHandler::new(protocol, ()),
);
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo,
) {
self.outbound_substreams.insert(
self.next_outbound_substream_id.fetch_and_increment(),
TOutboundSubstreamHandler::new(protocol, info),
);
}
fn inject_event(&mut self, event: Self::InEvent) {
match event {
InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info),
InEvent::NotifyInboundSubstream { id, message } => {
match self.inbound_substreams.remove(&id) {
Some(handler) => {
let new_handler = handler.inject_event(message);
self.inbound_substreams.insert(id, new_handler);
}
None => {
log::debug!("Substream with ID {} not found", id);
}
}
}
InEvent::NotifyOutboundSubstream { id, message } => {
match self.outbound_substreams.remove(&id) {
Some(handler) => {
let new_handler = handler.inject_event(message);
self.outbound_substreams.insert(id, new_handler);
}
None => {
log::debug!("Substream with ID {} not found", id);
}
}
}
}
}
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<Void>,
) {
// TODO: Handle upgrade errors properly
}
fn connection_keep_alive(&self) -> KeepAlive {
// Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols.
if Instant::now() < self.initial_keep_alive_deadline {
return KeepAlive::Yes;
}
if self.inbound_substreams.is_empty()
&& self.outbound_substreams.is_empty()
&& self.new_substreams.is_empty()
{
return KeepAlive::No;
}
KeepAlive::Yes
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if let Some(open_info) = self.new_substreams.pop_front() {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: TOutboundSubstreamHandler::upgrade(open_info),
});
}
match poll_substreams(&mut self.inbound_substreams, cx) {
Poll::Ready(Ok((id, message))) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::InboundEvent {
id,
message,
}))
}
Poll::Ready(Err((id, error))) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::InboundError {
id,
error,
}))
}
Poll::Pending => {}
}
match poll_substreams(&mut self.outbound_substreams, cx) {
Poll::Ready(Ok((id, message))) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::OutboundEvent {
id,
message,
}))
}
Poll::Ready(Err((id, error))) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::OutboundError {
id,
error,
}))
}
Poll::Pending => {}
}
Poll::Pending
}
}
/// A helper struct for substream handlers that can be implemented as async functions.
///
/// This only works for substreams without an `InEvent` because - once constructed - the state of an inner future is opaque.
pub struct FutureSubstream<TOutEvent, TError> {
future: Fuse<BoxFuture<'static, Result<TOutEvent, TError>>>,
}
impl<TOutEvent, TError> FutureSubstream<TOutEvent, TError> {
pub fn new(future: impl Future<Output = Result<TOutEvent, TError>> + Send + 'static) -> Self {
Self {
future: future.boxed().fuse(),
}
}
pub fn advance(mut self, cx: &mut Context<'_>) -> Result<Next<Self, TOutEvent>, TError> {
if self.future.is_terminated() {
return Ok(Next::Done);
}
match self.future.poll_unpin(cx) {
Poll::Ready(Ok(event)) => Ok(Next::EmitEvent {
event,
next_state: self,
}),
Poll::Ready(Err(error)) => Err(error),
Poll::Pending => Ok(Next::Pending { next_state: self }),
}
}
}
impl SubstreamHandler for void::Void {
type InEvent = void::Void;
type OutEvent = void::Void;
type Error = void::Void;
type OpenInfo = ();
fn new(_: NegotiatedSubstream, _: Self::OpenInfo) -> Self {
unreachable!("we should never yield a substream")
}
fn inject_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}
fn advance(self, _: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
void::unreachable(self)
}
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol { ident: None }, open_info)
}
}