refactor(rendezvous): rewrite using libp2p-request-response

Fixes [#3878](https://github.com/libp2p/rust-libp2p/issues/3878).

Pull-Request: #4051.


  
Co-Authored-By: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
Denis Garus 2023-06-14 20:49:23 +03:00 committed by GitHub
parent 14530af261
commit b8ceeccdc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 560 additions and 1342 deletions

1
Cargo.lock generated
View File

@ -2881,6 +2881,7 @@ dependencies = [
"libp2p-identity",
"libp2p-noise",
"libp2p-ping",
"libp2p-request-response",
"libp2p-swarm",
"libp2p-swarm-test",
"libp2p-tcp",

View File

@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
asynchronous-codec = "0.6"
async-trait = "0.1"
bimap = "0.6.3"
futures = { version = "0.3", default-features = false, features = ["std"] }
futures-timer = "3.0.2"
@ -19,6 +20,7 @@ instant = "0.1.12"
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
libp2p-request-response = { workspace = true }
log = "0.4"
quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
@ -27,7 +29,6 @@ thiserror = "1"
void = "1"
[dev-dependencies]
async-trait = "0.1"
env_logger = "0.10.0"
libp2p-swarm = { workspace = true, features = ["macros", "tokio"] }
libp2p-noise = { workspace = true }

View File

@ -18,32 +18,33 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl};
use crate::handler;
use crate::handler::outbound;
use crate::handler::outbound::OpenInfo;
use crate::substream_handler::{InEvent, SubstreamConnectionHandler};
use crate::codec::Message::*;
use crate::codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use instant::Duration;
use libp2p_core::{Endpoint, Multiaddr, PeerRecord};
use libp2p_identity::{Keypair, PeerId, SigningError};
use libp2p_swarm::behaviour::FromSwarm;
use libp2p_request_response::{ProtocolSupport, RequestId};
use libp2p_swarm::{
CloseConnection, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour,
NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, PollParameters,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use std::collections::{HashMap, VecDeque};
use std::iter::FromIterator;
use std::iter;
use std::task::{Context, Poll};
use void::Void;
use std::time::Duration;
pub struct Behaviour {
events: VecDeque<ToSwarm<Event, InEvent<outbound::OpenInfo, Void, Void>>>,
inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
keypair: Keypair,
pending_register_requests: Vec<(Namespace, PeerId, Option<Ttl>)>,
error_events: VecDeque<Event>,
waiting_for_register: HashMap<RequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<RequestId, (PeerId, Option<Namespace>)>,
/// Hold addresses of all peers that we have discovered so far.
///
@ -60,9 +61,15 @@ impl Behaviour {
/// Create a new instance of the rendezvous [`NetworkBehaviour`].
pub fn new(keypair: Keypair) -> Self {
Self {
events: Default::default(),
inner: libp2p_request_response::Behaviour::with_codec(
crate::codec::Codec::default(),
iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
libp2p_request_response::Config::default(),
),
error_events: Default::default(),
keypair,
pending_register_requests: vec![],
waiting_for_register: Default::default(),
waiting_for_discovery: Default::default(),
discovered_peers: Default::default(),
expiring_registrations: FuturesUnordered::from_iter(vec![
futures::future::pending().boxed()
@ -76,19 +83,35 @@ impl Behaviour {
/// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported
/// by other [`NetworkBehaviour`]s via [`ToSwarm::ExternalAddrConfirmed`].
pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option<Ttl>) {
self.pending_register_requests
.push((namespace, rendezvous_node, ttl));
let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
if external_addresses.is_empty() {
self.error_events
.push_back(Event::RegisterFailed(RegisterError::NoExternalAddresses));
return;
}
match PeerRecord::new(&self.keypair, external_addresses) {
Ok(peer_record) => {
let req_id = self.inner.send_request(
&rendezvous_node,
Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
);
self.waiting_for_register
.insert(req_id, (rendezvous_node, namespace));
}
Err(signing_error) => {
self.error_events.push_back(Event::RegisterFailed(
RegisterError::FailedToMakeRecord(signing_error),
));
}
};
}
/// Unregister ourselves from the given namespace with the given rendezvous peer.
pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::UnregisterRequest(namespace),
},
handler: NotifyHandler::Any,
});
self.inner
.send_request(&rendezvous_node, Unregister(namespace));
}
/// Discover other peers at a given rendezvous peer.
@ -100,22 +123,22 @@ impl Behaviour {
/// the cookie was acquired.
pub fn discover(
&mut self,
ns: Option<Namespace>,
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<u64>,
rendezvous_node: PeerId,
) {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::DiscoverRequest {
namespace: ns,
cookie,
limit,
},
let req_id = self.inner.send_request(
&rendezvous_node,
Discover {
namespace: namespace.clone(),
cookie,
limit,
},
handler: NotifyHandler::Any,
});
);
self.waiting_for_discovery
.insert(req_id, (rendezvous_node, namespace));
}
}
@ -161,20 +184,130 @@ pub enum Event {
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler =
SubstreamConnectionHandler<void::Void, outbound::Stream, outbound::OpenInfo>;
type ConnectionHandler = <libp2p_request_response::Behaviour<
crate::codec::Codec,
> as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(SubstreamConnectionHandler::new_outbound_only(
Duration::from_secs(30),
))
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.inner
.on_connection_handler_event(peer_id, connection_id, event);
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.external_addresses.on_swarm_event(&event);
self.inner.on_swarm_event(event);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
use libp2p_request_response as req_res;
if let Some(event) = self.error_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
loop {
match self.inner.poll(cx, params) {
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
message:
req_res::Message::Response {
request_id,
response,
},
..
})) => {
if let Some(event) = self.handle_response(&request_id, response) {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
continue; // not a request we care about
}
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::OutboundFailure {
request_id,
..
})) => {
if let Some(event) = self.event_for_outbound_failure(&request_id) {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
continue; // not a request we care about
}
Poll::Ready(ToSwarm::GenerateEvent(
req_res::Event::InboundFailure { .. }
| req_res::Event::ResponseSent { .. }
| req_res::Event::Message {
message: req_res::Message::Request { .. },
..
},
)) => {
unreachable!("rendezvous clients never receive requests")
}
Poll::Ready(
other @ (ToSwarm::ExternalAddrConfirmed(_)
| ToSwarm::ExternalAddrExpired(_)
| ToSwarm::NewExternalAddrCandidate(_)
| ToSwarm::NotifyHandler { .. }
| ToSwarm::Dial { .. }
| ToSwarm::CloseConnection { .. }
| ToSwarm::ListenOn { .. }
| ToSwarm::RemoveListener { .. }),
) => {
let new_to_swarm =
other.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
return Poll::Ready(new_to_swarm);
}
Poll::Pending => {}
}
if let Poll::Ready(Some(expired_registration)) =
self.expiring_registrations.poll_next_unpin(cx)
{
self.discovered_peers.remove(&expired_registration);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
peer: expired_registration.0,
}));
}
return Poll::Pending;
}
}
fn handle_pending_outbound_connection(
@ -199,178 +332,103 @@ impl NetworkBehaviour for Behaviour {
Ok(addresses)
}
}
fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(SubstreamConnectionHandler::new_outbound_only(
Duration::from_secs(30),
))
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
let new_events = match event {
handler::OutboundOutEvent::InboundEvent { message, .. } => void::unreachable(message),
handler::OutboundOutEvent::OutboundEvent { message, .. } => handle_outbound_event(
message,
peer_id,
&mut self.discovered_peers,
&mut self.expiring_registrations,
),
handler::OutboundOutEvent::InboundError { error, .. } => void::unreachable(error),
handler::OutboundOutEvent::OutboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error);
vec![ToSwarm::CloseConnection {
peer_id,
connection: CloseConnection::One(connection_id),
}]
}
impl Behaviour {
fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option<Event> {
if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
return Some(Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
}));
};
self.events.extend(new_events);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() {
// Update our external addresses based on the Swarm's current knowledge.
// It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside.
let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
if external_addresses.is_empty() {
return Poll::Ready(ToSwarm::GenerateEvent(Event::RegisterFailed(
RegisterError::NoExternalAddresses,
)));
}
let action = match PeerRecord::new(&self.keypair, external_addresses) {
Ok(peer_record) => ToSwarm::NotifyHandler {
peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::RegisterRequest(NewRegistration {
namespace,
record: peer_record,
ttl,
}),
},
handler: NotifyHandler::Any,
},
Err(signing_error) => ToSwarm::GenerateEvent(Event::RegisterFailed(
RegisterError::FailedToMakeRecord(signing_error),
)),
};
return Poll::Ready(action);
}
if let Some(expired_registration) =
futures::ready!(self.expiring_registrations.poll_next_unpin(cx))
{
self.discovered_peers.remove(&expired_registration);
return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
peer: expired_registration.0,
}));
}
Poll::Pending
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.external_addresses.on_swarm_event(&event);
match event {
FromSwarm::ConnectionEstablished(_)
| FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
}
}
fn handle_outbound_event(
event: outbound::OutEvent,
peer_id: PeerId,
discovered_peers: &mut HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
expiring_registrations: &mut FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
) -> Vec<ToSwarm<Event, THandlerInEvent<Behaviour>>> {
match event {
outbound::OutEvent::Registered { namespace, ttl } => {
vec![ToSwarm::GenerateEvent(Event::Registered {
rendezvous_node: peer_id,
ttl,
if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
return Some(Event::DiscoverFailed {
rendezvous_node,
namespace,
})]
}
outbound::OutEvent::RegisterFailed(namespace, error) => {
vec![ToSwarm::GenerateEvent(Event::RegisterFailed(
RegisterError::Remote {
rendezvous_node: peer_id,
namespace,
error,
},
))]
}
outbound::OutEvent::Discovered {
registrations,
cookie,
} => {
discovered_peers.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();
error: ErrorCode::Unavailable,
});
};
let addresses = registration.record.addresses().to_vec();
None
}
((peer_id, namespace), addresses)
}));
expiring_registrations.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
futures_timer::Delay::new(Duration::from_secs(registration.ttl)).await;
(registration.record.peer_id(), registration.namespace)
fn handle_response(&mut self, request_id: &RequestId, response: Message) -> Option<Event> {
match response {
RegisterResponse(Ok(ttl)) => {
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(request_id)
{
return Some(Event::Registered {
rendezvous_node,
ttl,
namespace,
});
}
.boxed()
}));
vec![ToSwarm::GenerateEvent(Event::Discovered {
rendezvous_node: peer_id,
registrations,
cookie,
})]
}
outbound::OutEvent::DiscoverFailed { namespace, error } => {
vec![ToSwarm::GenerateEvent(Event::DiscoverFailed {
rendezvous_node: peer_id,
namespace,
error,
})]
None
}
RegisterResponse(Err(error_code)) => {
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(request_id)
{
return Some(Event::RegisterFailed(RegisterError::Remote {
rendezvous_node,
namespace,
error: error_code,
}));
}
None
}
DiscoverResponse(Ok((registrations, cookie))) => {
if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
{
self.discovered_peers
.extend(registrations.iter().map(|registration| {
let peer_id = registration.record.peer_id();
let namespace = registration.namespace.clone();
let addresses = registration.record.addresses().to_vec();
((peer_id, namespace), addresses)
}));
self.expiring_registrations
.extend(registrations.iter().cloned().map(|registration| {
async move {
// if the timer errors we consider it expired
futures_timer::Delay::new(Duration::from_secs(registration.ttl))
.await;
(registration.record.peer_id(), registration.namespace)
}
.boxed()
}));
return Some(Event::Discovered {
rendezvous_node,
registrations,
cookie,
});
}
None
}
DiscoverResponse(Err(error_code)) => {
if let Some((rendezvous_node, ns)) = self.waiting_for_discovery.remove(request_id) {
return Some(Event::DiscoverFailed {
rendezvous_node,
namespace: ns,
error: error_code,
});
}
None
}
_ => unreachable!("rendezvous clients never receive requests"),
}
}
}

View File

@ -19,16 +19,24 @@
// DEALINGS IN THE SOFTWARE.
use crate::DEFAULT_TTL;
use async_trait::async_trait;
use asynchronous_codec::{BytesMut, Decoder, Encoder};
use asynchronous_codec::{FramedRead, FramedWrite};
use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt};
use libp2p_core::{peer_record, signed_envelope, PeerRecord, SignedEnvelope};
use libp2p_swarm::StreamProtocol;
use quick_protobuf_codec::Codec as ProtobufCodec;
use rand::RngCore;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::{fmt, io};
pub type Ttl = u64;
pub(crate) type Limit = u64;
const MAX_MESSAGE_LEN_BYTES: usize = 1024 * 1024;
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum Message {
Register(NewRegistration),
RegisterResponse(Result<Ttl, ErrorCode>),
@ -36,7 +44,7 @@ pub enum Message {
Discover {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<Ttl>,
limit: Option<Limit>,
},
DiscoverResponse(Result<(Vec<Registration>, Cookie), ErrorCode>),
}
@ -49,7 +57,7 @@ impl Namespace {
///
/// This will panic if the namespace is too long. We accepting panicking in this case because we are enforcing a `static lifetime which means this value can only be a constant in the program and hence we hope the developer checked that it is of an acceptable length.
pub fn from_static(value: &'static str) -> Self {
if value.len() > 255 {
if value.len() > crate::MAX_NAMESPACE {
panic!("Namespace '{value}' is too long!")
}
@ -57,7 +65,7 @@ impl Namespace {
}
pub fn new(value: String) -> Result<Self, NamespaceTooLong> {
if value.len() > 255 {
if value.len() > crate::MAX_NAMESPACE {
return Err(NamespaceTooLong);
}
@ -160,7 +168,7 @@ impl Cookie {
#[error("The cookie was malformed")]
pub struct InvalidCookie;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct NewRegistration {
pub namespace: Namespace,
pub record: PeerRecord,
@ -199,35 +207,27 @@ pub enum ErrorCode {
Unavailable,
}
pub struct RendezvousCodec {
inner: quick_protobuf_codec::Codec<proto::Message>,
}
impl Default for RendezvousCodec {
fn default() -> Self {
Self {
inner: quick_protobuf_codec::Codec::new(1024 * 1024), // 1MB
}
}
}
impl Encoder for RendezvousCodec {
impl Encoder for Codec {
type Item = Message;
type Error = Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.inner.encode(proto::Message::from(item), dst)?;
let mut pb: ProtobufCodec<proto::Message> = ProtobufCodec::new(MAX_MESSAGE_LEN_BYTES);
pb.encode(proto::Message::from(item), dst)?;
Ok(())
}
}
impl Decoder for RendezvousCodec {
impl Decoder for Codec {
type Item = Message;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let message = match self.inner.decode(src)? {
let mut pb: ProtobufCodec<proto::Message> = ProtobufCodec::new(MAX_MESSAGE_LEN_BYTES);
let message = match pb.decode(src)? {
Some(p) => p,
None => return Ok(None),
};
@ -236,6 +236,72 @@ impl Decoder for RendezvousCodec {
}
}
#[derive(Clone, Default)]
pub struct Codec {}
#[async_trait]
impl libp2p_request_response::Codec for Codec {
type Protocol = StreamProtocol;
type Request = Message;
type Response = Message;
async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let message = FramedRead::new(io, self.clone())
.next()
.await
.ok_or(io::ErrorKind::UnexpectedEof)??;
Ok(message)
}
async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let message = FramedRead::new(io, self.clone())
.next()
.await
.ok_or(io::ErrorKind::UnexpectedEof)??;
Ok(message)
}
async fn write_request<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
FramedWrite::new(io, self.clone()).send(req).await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
FramedWrite::new(io, self.clone()).send(res).await?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
@ -246,6 +312,16 @@ pub enum Error {
Conversion(#[from] ConversionError),
}
impl From<Error> for std::io::Error {
fn from(value: Error) -> Self {
match value {
Error::Io(e) => e,
Error::Codec(e) => io::Error::from(e),
Error::Conversion(e) => io::Error::new(io::ErrorKind::InvalidInput, e),
}
}
}
impl From<Message> for proto::Message {
fn from(message: Message) -> Self {
match message {
@ -528,7 +604,7 @@ impl TryFrom<proto::ResponseStatus> for ErrorCode {
E_UNAVAILABLE => ErrorCode::Unavailable,
};
Result::Ok(code)
Ok(code)
}
}
@ -567,6 +643,7 @@ mod proto {
#[cfg(test)]
mod tests {
use super::*;
use crate::Namespace;
#[test]
fn cookie_wire_encoding_roundtrip() {

View File

@ -1,50 +0,0 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec;
use crate::codec::Message;
use libp2p_swarm::StreamProtocol;
use void::Void;
const PROTOCOL_IDENT: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0");
pub(crate) mod inbound;
pub(crate) mod outbound;
/// Errors that can occur while interacting with a substream.
#[allow(clippy::large_enum_variant)]
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Reading message {0:?} at this stage is a protocol violation")]
BadMessage(Message),
#[error("Failed to write message to substream")]
WriteMessage(#[source] codec::Error),
#[error("Failed to read message from substream")]
ReadMessage(#[source] codec::Error),
#[error("Substream ended unexpectedly mid-protocol")]
UnexpectedEndOfStream,
}
pub(crate) type OutboundInEvent = crate::substream_handler::InEvent<outbound::OpenInfo, Void, Void>;
pub(crate) type OutboundOutEvent =
crate::substream_handler::OutEvent<Void, outbound::OutEvent, Void, Error>;
pub(crate) type InboundInEvent = crate::substream_handler::InEvent<(), inbound::InEvent, Void>;
pub(crate) type InboundOutEvent =
crate::substream_handler::OutEvent<inbound::OutEvent, Void, Error, Void>;

View File

@ -1,192 +0,0 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{
Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, RendezvousCodec, Ttl,
};
use crate::handler::Error;
use crate::handler::PROTOCOL_IDENT;
use crate::substream_handler::{Next, PassthroughProtocol, SubstreamHandler};
use asynchronous_codec::Framed;
use futures::{SinkExt, StreamExt};
use libp2p_swarm::SubstreamProtocol;
use std::fmt;
use std::task::{Context, Poll};
/// The state of an inbound substream (i.e. the remote node opened it).
#[allow(clippy::large_enum_variant)]
#[allow(clippy::enum_variant_names)]
pub enum Stream {
/// We are in the process of reading a message from the substream.
PendingRead(Framed<libp2p_swarm::Stream, RendezvousCodec>),
/// We read a message, dispatched it to the behaviour and are waiting for the response.
PendingBehaviour(Framed<libp2p_swarm::Stream, RendezvousCodec>),
/// We are in the process of sending a response.
PendingSend(Framed<libp2p_swarm::Stream, RendezvousCodec>, Message),
/// We've sent the message and are now closing down the substream.
PendingClose(Framed<libp2p_swarm::Stream, RendezvousCodec>),
}
impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Stream::PendingRead(_) => write!(f, "Inbound::PendingRead"),
Stream::PendingBehaviour(_) => write!(f, "Inbound::PendingBehaviour"),
Stream::PendingSend(_, _) => write!(f, "Inbound::PendingSend"),
Stream::PendingClose(_) => write!(f, "Inbound::PendingClose"),
}
}
}
#[allow(clippy::large_enum_variant)]
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Clone)]
pub enum OutEvent {
RegistrationRequested(NewRegistration),
UnregisterRequested(Namespace),
DiscoverRequested {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<u64>,
},
}
#[derive(Debug)]
pub enum InEvent {
RegisterResponse {
ttl: Ttl,
},
DeclineRegisterRequest(ErrorCode),
DiscoverResponse {
discovered: Vec<Registration>,
cookie: Cookie,
},
DeclineDiscoverRequest(ErrorCode),
}
impl SubstreamHandler for Stream {
type InEvent = InEvent;
type OutEvent = OutEvent;
type Error = Error;
type OpenInfo = ();
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info)
}
fn new(substream: libp2p_swarm::Stream, _: Self::OpenInfo) -> Self {
Stream::PendingRead(Framed::new(substream, RendezvousCodec::default()))
}
fn on_event(self, event: Self::InEvent) -> Self {
match (event, self) {
(InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl)))
}
(InEvent::DeclineRegisterRequest(error), Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::RegisterResponse(Err(error)))
}
(
InEvent::DiscoverResponse { discovered, cookie },
Stream::PendingBehaviour(substream),
) => Stream::PendingSend(
substream,
Message::DiscoverResponse(Ok((discovered, cookie))),
),
(InEvent::DeclineDiscoverRequest(error), Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::DiscoverResponse(Err(error)))
}
(event, inbound) => {
debug_assert!(false, "{inbound:?} cannot handle event {event:?}");
inbound
}
}
}
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
let next_state = match self {
Stream::PendingRead(mut substream) => {
match substream.poll_next_unpin(cx).map_err(Error::ReadMessage)? {
Poll::Ready(Some(msg)) => {
let event = match msg {
Message::Register(registration) => {
OutEvent::RegistrationRequested(registration)
}
Message::Discover {
cookie,
namespace,
limit,
} => OutEvent::DiscoverRequested {
cookie,
namespace,
limit,
},
Message::Unregister(namespace) => {
OutEvent::UnregisterRequested(namespace)
}
other => return Err(Error::BadMessage(other)),
};
Next::EmitEvent {
event,
next_state: Stream::PendingBehaviour(substream),
}
}
Poll::Ready(None) => return Err(Error::UnexpectedEndOfStream),
Poll::Pending => Next::Pending {
next_state: Stream::PendingRead(substream),
},
}
}
Stream::PendingBehaviour(substream) => Next::Pending {
next_state: Stream::PendingBehaviour(substream),
},
Stream::PendingSend(mut substream, message) => match substream
.poll_ready_unpin(cx)
.map_err(Error::WriteMessage)?
{
Poll::Ready(()) => {
substream
.start_send_unpin(message)
.map_err(Error::WriteMessage)?;
Next::Continue {
next_state: Stream::PendingClose(substream),
}
}
Poll::Pending => Next::Pending {
next_state: Stream::PendingSend(substream, message),
},
},
Stream::PendingClose(mut substream) => match substream.poll_close_unpin(cx) {
Poll::Ready(Ok(())) => Next::Done,
Poll::Ready(Err(_)) => Next::Done, // there is nothing we can do about an error during close
Poll::Pending => Next::Pending {
next_state: Stream::PendingClose(substream),
},
},
};
Ok(next_state)
}
}

View File

@ -1,134 +0,0 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{Cookie, Message, NewRegistration, RendezvousCodec};
use crate::handler::Error;
use crate::handler::PROTOCOL_IDENT;
use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, SubstreamHandler};
use crate::{ErrorCode, Namespace, Registration, Ttl};
use asynchronous_codec::Framed;
use futures::{SinkExt, TryFutureExt, TryStreamExt};
use libp2p_swarm::SubstreamProtocol;
use std::task::Context;
use void::Void;
pub struct Stream(FutureSubstream<OutEvent, Error>);
impl SubstreamHandler for Stream {
type InEvent = Void;
type OutEvent = OutEvent;
type Error = Error;
type OpenInfo = OpenInfo;
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info)
}
fn new(substream: libp2p_swarm::Stream, info: Self::OpenInfo) -> Self {
let mut stream = Framed::new(substream, RendezvousCodec::default());
let sent_message = match info {
OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration),
OpenInfo::UnregisterRequest(namespace) => Message::Unregister(namespace),
OpenInfo::DiscoverRequest {
namespace,
cookie,
limit,
} => Message::Discover {
namespace,
cookie,
limit,
},
};
Self(FutureSubstream::new(async move {
use Message::*;
use OutEvent::*;
stream
.send(sent_message.clone())
.map_err(Error::WriteMessage)
.await?;
let received_message = stream.try_next().map_err(Error::ReadMessage).await?;
let received_message = received_message.ok_or(Error::UnexpectedEndOfStream)?;
let event = match (sent_message, received_message) {
(Register(registration), RegisterResponse(Ok(ttl))) => Registered {
namespace: registration.namespace,
ttl,
},
(Register(registration), RegisterResponse(Err(error))) => {
RegisterFailed(registration.namespace, error)
}
(Discover { .. }, DiscoverResponse(Ok((registrations, cookie)))) => Discovered {
registrations,
cookie,
},
(Discover { namespace, .. }, DiscoverResponse(Err(error))) => {
DiscoverFailed { namespace, error }
}
(.., other) => return Err(Error::BadMessage(other)),
};
stream.close().map_err(Error::WriteMessage).await?;
Ok(event)
}))
}
fn on_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
Ok(self.0.advance(cx)?.map_state(Stream))
}
}
#[derive(Debug, Clone)]
pub enum OutEvent {
Registered {
namespace: Namespace,
ttl: Ttl,
},
RegisterFailed(Namespace, ErrorCode),
Discovered {
registrations: Vec<Registration>,
cookie: Cookie,
},
DiscoverFailed {
namespace: Option<Namespace>,
error: ErrorCode,
},
}
#[allow(clippy::large_enum_variant)]
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
pub enum OpenInfo {
RegisterRequest(NewRegistration),
UnregisterRequest(Namespace),
DiscoverRequest {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<Ttl>,
},
}

View File

@ -23,10 +23,9 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub use self::codec::{Cookie, ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl};
use libp2p_swarm::StreamProtocol;
mod codec;
mod handler;
mod substream_handler;
/// If unspecified, rendezvous nodes should assume a TTL of 2h.
///
@ -43,5 +42,12 @@ pub const MIN_TTL: Ttl = 60 * 60 * 2;
/// <https://github.com/libp2p/specs/tree/master/rendezvous#recommendations-for-rendezvous-points-configurations>.
pub const MAX_TTL: Ttl = 60 * 60 * 72;
/// The maximum namespace length.
///
/// <https://github.com/libp2p/specs/tree/master/rendezvous#recommendations-for-rendezvous-points-configurations>.
pub const MAX_NAMESPACE: usize = 255;
pub(crate) const PROTOCOL_IDENT: StreamProtocol = StreamProtocol::new("/rendezvous/1.0.0");
pub mod client;
pub mod server;

View File

@ -18,30 +18,29 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl};
use crate::handler::inbound;
use crate::substream_handler::{InEvent, InboundSubstreamId, SubstreamConnectionHandler};
use crate::{handler, MAX_TTL, MIN_TTL};
use crate::codec::{Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, Ttl};
use crate::{MAX_TTL, MIN_TTL};
use bimap::BiMap;
use futures::future::BoxFuture;
use futures::ready;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_request_response::ProtocolSupport;
use libp2p_swarm::behaviour::FromSwarm;
use libp2p_swarm::{
CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use std::iter;
use std::iter::FromIterator;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::time::Duration;
use void::Void;
pub struct Behaviour {
events: VecDeque<ToSwarm<Event, InEvent<(), inbound::InEvent, Void>>>,
inner: libp2p_request_response::Behaviour<crate::codec::Codec>,
registrations: Registrations,
}
@ -75,7 +74,12 @@ impl Behaviour {
/// Create a new instance of the rendezvous [`NetworkBehaviour`].
pub fn new(config: Config) -> Self {
Self {
events: Default::default(),
inner: libp2p_request_response::Behaviour::with_codec(
crate::codec::Codec::default(),
iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Inbound)),
libp2p_request_response::Config::default(),
),
registrations: Registrations::with_config(config),
}
}
@ -109,31 +113,36 @@ pub enum Event {
}
impl NetworkBehaviour for Behaviour {
type ConnectionHandler = SubstreamConnectionHandler<inbound::Stream, Void, ()>;
type ConnectionHandler = <libp2p_request_response::Behaviour<
crate::codec::Codec,
> as NetworkBehaviour>::ConnectionHandler;
type ToSwarm = Event;
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: &Multiaddr,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(SubstreamConnectionHandler::new_inbound_only(
Duration::from_secs(30),
))
self.inner.handle_established_inbound_connection(
connection_id,
peer,
local_addr,
remote_addr,
)
}
fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
_: Endpoint,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(SubstreamConnectionHandler::new_inbound_only(
Duration::from_secs(30),
))
self.inner
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
}
fn on_connection_handler_event(
@ -142,29 +151,14 @@ impl NetworkBehaviour for Behaviour {
connection: ConnectionId,
event: THandlerOutEvent<Self>,
) {
let new_events = match event {
handler::InboundOutEvent::InboundEvent { id, message } => {
handle_inbound_event(message, peer_id, connection, id, &mut self.registrations)
}
handler::InboundOutEvent::OutboundEvent { message, .. } => void::unreachable(message),
handler::InboundOutEvent::InboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error);
vec![ToSwarm::CloseConnection {
peer_id,
connection: CloseConnection::One(connection),
}]
}
handler::InboundOutEvent::OutboundError { error, .. } => void::unreachable(error),
};
self.events.extend(new_events);
self.inner
.on_connection_handler_event(peer_id, connection, event);
}
fn poll(
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) {
return Poll::Ready(ToSwarm::GenerateEvent(Event::RegistrationExpired(
@ -172,106 +166,134 @@ impl NetworkBehaviour for Behaviour {
)));
}
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
loop {
if let Poll::Ready(to_swarm) = self.inner.poll(cx, params) {
match to_swarm {
ToSwarm::GenerateEvent(libp2p_request_response::Event::Message {
peer: peer_id,
message:
libp2p_request_response::Message::Request {
request, channel, ..
},
}) => {
if let Some((event, response)) =
handle_request(peer_id, request, &mut self.registrations)
{
if let Some(resp) = response {
self.inner
.send_response(channel, resp)
.expect("Send response");
}
Poll::Pending
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
continue;
}
ToSwarm::GenerateEvent(libp2p_request_response::Event::InboundFailure {
peer,
request_id,
error,
}) => {
log::warn!("Inbound request {request_id} with peer {peer} failed: {error}");
continue;
}
ToSwarm::GenerateEvent(libp2p_request_response::Event::ResponseSent {
..
})
| ToSwarm::GenerateEvent(libp2p_request_response::Event::Message {
peer: _,
message: libp2p_request_response::Message::Response { .. },
})
| ToSwarm::GenerateEvent(libp2p_request_response::Event::OutboundFailure {
..
}) => {
continue;
}
ToSwarm::Dial { .. }
| ToSwarm::ListenOn { .. }
| ToSwarm::RemoveListener { .. }
| ToSwarm::NotifyHandler { .. }
| ToSwarm::NewExternalAddrCandidate(_)
| ToSwarm::ExternalAddrConfirmed(_)
| ToSwarm::ExternalAddrExpired(_)
| ToSwarm::CloseConnection { .. } => {
let new_to_swarm = to_swarm
.map_out(|_| unreachable!("we manually map `GenerateEvent` variants"));
return Poll::Ready(new_to_swarm);
}
};
}
return Poll::Pending;
}
}
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
match event {
FromSwarm::ConnectionEstablished(_)
| FromSwarm::ConnectionClosed(_)
| FromSwarm::AddressChange(_)
| FromSwarm::DialFailure(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::ListenerError(_)
| FromSwarm::ListenerClosed(_)
| FromSwarm::NewExternalAddrCandidate(_)
| FromSwarm::ExternalAddrExpired(_)
| FromSwarm::ExternalAddrConfirmed(_) => {}
}
self.inner.on_swarm_event(event);
}
}
fn handle_inbound_event(
event: inbound::OutEvent,
fn handle_request(
peer_id: PeerId,
connection: ConnectionId,
id: InboundSubstreamId,
message: Message,
registrations: &mut Registrations,
) -> Vec<ToSwarm<Event, THandlerInEvent<Behaviour>>> {
match event {
// bad registration
inbound::OutEvent::RegistrationRequested(registration)
if registration.record.peer_id() != peer_id =>
{
let error = ErrorCode::NotAuthorized;
) -> Option<(Event, Option<Message>)> {
match message {
Message::Register(registration) => {
if registration.record.peer_id() != peer_id {
let error = ErrorCode::NotAuthorized;
vec![
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DeclineRegisterRequest(error),
},
},
ToSwarm::GenerateEvent(Event::PeerNotRegistered {
let event = Event::PeerNotRegistered {
peer: peer_id,
namespace: registration.namespace,
error,
}),
]
}
inbound::OutEvent::RegistrationRequested(registration) => {
};
return Some((event, Some(Message::RegisterResponse(Err(error)))));
}
let namespace = registration.namespace.clone();
match registrations.add(registration) {
Ok(registration) => {
vec![
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::RegisterResponse {
ttl: registration.ttl,
},
},
},
ToSwarm::GenerateEvent(Event::PeerRegistered {
peer: peer_id,
registration,
}),
]
let response = Message::RegisterResponse(Ok(registration.ttl));
let event = Event::PeerRegistered {
peer: peer_id,
registration,
};
Some((event, Some(response)))
}
Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => {
let error = ErrorCode::InvalidTtl;
vec![
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DeclineRegisterRequest(error),
},
},
ToSwarm::GenerateEvent(Event::PeerNotRegistered {
peer: peer_id,
namespace,
error,
}),
]
let response = Message::RegisterResponse(Err(error));
let event = Event::PeerNotRegistered {
peer: peer_id,
namespace,
error,
};
Some((event, Some(response)))
}
}
}
inbound::OutEvent::DiscoverRequested {
Message::Unregister(namespace) => {
registrations.remove(namespace.clone(), peer_id);
let event = Event::PeerUnregistered {
peer: peer_id,
namespace,
};
Some((event, None))
}
Message::Discover {
namespace,
cookie,
limit,
@ -279,51 +301,30 @@ fn handle_inbound_event(
Ok((registrations, cookie)) => {
let discovered = registrations.cloned().collect::<Vec<_>>();
vec![
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DiscoverResponse {
discovered: discovered.clone(),
cookie,
},
},
},
ToSwarm::GenerateEvent(Event::DiscoverServed {
enquirer: peer_id,
registrations: discovered,
}),
]
let response = Message::DiscoverResponse(Ok((discovered.clone(), cookie)));
let event = Event::DiscoverServed {
enquirer: peer_id,
registrations: discovered,
};
Some((event, Some(response)))
}
Err(_) => {
let error = ErrorCode::InvalidCookie;
vec![
ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream {
id,
message: inbound::InEvent::DeclineDiscoverRequest(error),
},
},
ToSwarm::GenerateEvent(Event::DiscoverNotServed {
enquirer: peer_id,
error,
}),
]
let response = Message::DiscoverResponse(Err(error));
let event = Event::DiscoverNotServed {
enquirer: peer_id,
error,
};
Some((event, Some(response)))
}
},
inbound::OutEvent::UnregisterRequested(namespace) => {
registrations.remove(namespace.clone(), peer_id);
vec![ToSwarm::GenerateEvent(Event::PeerUnregistered {
peer: peer_id,
namespace,
})]
}
Message::RegisterResponse(_) => None,
Message::DiscoverResponse(_) => None,
}
}
@ -488,32 +489,38 @@ impl Registrations {
self.cookies
.insert(new_cookie.clone(), reggos_of_last_discover);
let reggos = &self.registrations;
let regs = &self.registrations;
let registrations = ids
.into_iter()
.map(move |id| reggos.get(&id).expect("bad internal datastructure"));
.map(move |id| regs.get(&id).expect("bad internal data structure"));
Ok((registrations, new_cookie))
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ExpiredRegistration> {
let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect(
"This stream should never finish because it is initialised with a pending future",
);
loop {
let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect(
"This stream should never finish because it is initialised with a pending future",
);
// clean up our cookies
self.cookies.retain(|_, registrations| {
registrations.remove(&expired_registration);
// clean up our cookies
self.cookies.retain(|_, registrations| {
registrations.remove(&expired_registration);
// retain all cookies where there are still registrations left
!registrations.is_empty()
});
// retain all cookies where there are still registrations left
!registrations.is_empty()
});
self.registrations_for_peer
.remove_by_right(&expired_registration);
match self.registrations.remove(&expired_registration) {
None => self.poll(cx),
Some(registration) => Poll::Ready(ExpiredRegistration(registration)),
self.registrations_for_peer
.remove_by_right(&expired_registration);
match self.registrations.remove(&expired_registration) {
None => {
continue;
}
Some(registration) => {
return Poll::Ready(ExpiredRegistration(registration));
}
}
}
}
}

View File

@ -1,556 +0,0 @@
// Copyright 2021 COMIT Network.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A generic [`ConnectionHandler`] that delegates the handling of substreams to [`SubstreamHandler`]s.
//!
//! This module is an attempt to simplify the implementation of protocols by freeing implementations from dealing with aspects such as concurrent substreams.
//! Particularly for outbound substreams, it greatly simplifies the definition of protocols through the [`FutureSubstream`] helper.
//!
//! At the moment, this module is an implementation detail of the rendezvous protocol but the intent is for it to be provided as a generic module that is accessible to other protocols as well.
use futures::future::{self, BoxFuture, Fuse, FusedFuture};
use futures::FutureExt;
use instant::Instant;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, SubstreamProtocol,
};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::future::Future;
use std::hash::Hash;
use std::task::{Context, Poll};
use std::time::Duration;
use void::Void;
/// Handles a substream throughout its lifetime.
pub trait SubstreamHandler: Sized {
type InEvent;
type OutEvent;
type Error;
type OpenInfo;
fn upgrade(open_info: Self::OpenInfo)
-> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo>;
fn new(substream: Stream, info: Self::OpenInfo) -> Self;
fn on_event(self, event: Self::InEvent) -> Self;
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error>;
}
/// The result of advancing a [`SubstreamHandler`].
pub enum Next<TState, TEvent> {
/// Return the given event and set the handler into `next_state`.
EmitEvent { event: TEvent, next_state: TState },
/// The handler currently cannot do any more work, set its state back into `next_state`.
Pending { next_state: TState },
/// The handler performed some work and wants to continue in the given state.
///
/// This variant is useful because it frees the handler from implementing a loop internally.
Continue { next_state: TState },
/// The handler finished.
Done,
}
impl<TState, TEvent> Next<TState, TEvent> {
pub fn map_state<TNextState>(
self,
map: impl FnOnce(TState) -> TNextState,
) -> Next<TNextState, TEvent> {
match self {
Next::EmitEvent { event, next_state } => Next::EmitEvent {
event,
next_state: map(next_state),
},
Next::Pending { next_state } => Next::Pending {
next_state: map(next_state),
},
Next::Continue { next_state } => Next::Pending {
next_state: map(next_state),
},
Next::Done => Next::Done,
}
}
}
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)]
pub struct InboundSubstreamId(u64);
impl InboundSubstreamId {
fn fetch_and_increment(&mut self) -> Self {
let next_id = *self;
self.0 += 1;
next_id
}
}
impl fmt::Display for InboundSubstreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)]
pub struct OutboundSubstreamId(u64);
impl OutboundSubstreamId {
fn fetch_and_increment(&mut self) -> Self {
let next_id = *self;
self.0 += 1;
next_id
}
}
impl fmt::Display for OutboundSubstreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct PassthroughProtocol {
ident: Option<StreamProtocol>,
}
impl PassthroughProtocol {
pub fn new(ident: StreamProtocol) -> Self {
Self { ident: Some(ident) }
}
}
impl UpgradeInfo for PassthroughProtocol {
type Info = StreamProtocol;
type InfoIter = std::option::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.ident.clone().into_iter()
}
}
impl<C: Send + 'static> InboundUpgrade<C> for PassthroughProtocol {
type Output = C;
type Error = Void;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
match self.ident {
Some(_) => future::ready(Ok(socket)).boxed(),
None => future::pending().boxed(),
}
}
}
impl<C: Send + 'static> OutboundUpgrade<C> for PassthroughProtocol {
type Output = C;
type Error = Void;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
match self.ident {
Some(_) => future::ready(Ok(socket)).boxed(),
None => future::pending().boxed(),
}
}
}
/// An implementation of [`ConnectionHandler`] that delegates to individual [`SubstreamHandler`]s.
pub struct SubstreamConnectionHandler<TInboundSubstream, TOutboundSubstream, TOutboundOpenInfo> {
inbound_substreams: HashMap<InboundSubstreamId, TInboundSubstream>,
outbound_substreams: HashMap<OutboundSubstreamId, TOutboundSubstream>,
next_inbound_substream_id: InboundSubstreamId,
next_outbound_substream_id: OutboundSubstreamId,
new_substreams: VecDeque<TOutboundOpenInfo>,
initial_keep_alive_deadline: Instant,
}
impl<TInboundSubstream, TOutboundSubstream, TOutboundOpenInfo>
SubstreamConnectionHandler<TInboundSubstream, TOutboundSubstream, TOutboundOpenInfo>
{
pub fn new(initial_keep_alive: Duration) -> Self {
Self {
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
next_inbound_substream_id: InboundSubstreamId(0),
next_outbound_substream_id: OutboundSubstreamId(0),
new_substreams: Default::default(),
initial_keep_alive_deadline: Instant::now() + initial_keep_alive,
}
}
}
impl<TOutboundSubstream, TOutboundOpenInfo>
SubstreamConnectionHandler<void::Void, TOutboundSubstream, TOutboundOpenInfo>
{
pub fn new_outbound_only(initial_keep_alive: Duration) -> Self {
Self {
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
next_inbound_substream_id: InboundSubstreamId(0),
next_outbound_substream_id: OutboundSubstreamId(0),
new_substreams: Default::default(),
initial_keep_alive_deadline: Instant::now() + initial_keep_alive,
}
}
}
impl<TInboundSubstream, TOutboundOpenInfo>
SubstreamConnectionHandler<TInboundSubstream, void::Void, TOutboundOpenInfo>
{
pub fn new_inbound_only(initial_keep_alive: Duration) -> Self {
Self {
inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
next_inbound_substream_id: InboundSubstreamId(0),
next_outbound_substream_id: OutboundSubstreamId(0),
new_substreams: Default::default(),
initial_keep_alive_deadline: Instant::now() + initial_keep_alive,
}
}
}
/// Poll all substreams within the given HashMap.
///
/// This is defined as a separate function because we call it with two different fields stored within [`SubstreamConnectionHandler`].
fn poll_substreams<TId, TSubstream, TError, TOutEvent>(
substreams: &mut HashMap<TId, TSubstream>,
cx: &mut Context<'_>,
) -> Poll<Result<(TId, TOutEvent), (TId, TError)>>
where
TSubstream: SubstreamHandler<OutEvent = TOutEvent, Error = TError>,
TId: Copy + Eq + Hash + fmt::Display,
{
let substream_ids = substreams.keys().copied().collect::<Vec<_>>();
'loop_substreams: for id in substream_ids {
let mut handler = substreams
.remove(&id)
.expect("we just got the key out of the map");
let (next_state, poll) = 'loop_handler: loop {
match handler.advance(cx) {
Ok(Next::EmitEvent { next_state, event }) => {
break (next_state, Poll::Ready(Ok((id, event))))
}
Ok(Next::Pending { next_state }) => break (next_state, Poll::Pending),
Ok(Next::Continue { next_state }) => {
handler = next_state;
continue 'loop_handler;
}
Ok(Next::Done) => {
log::debug!("Substream handler {} finished", id);
continue 'loop_substreams;
}
Err(e) => return Poll::Ready(Err((id, e))),
}
};
substreams.insert(id, next_state);
return poll;
}
Poll::Pending
}
/// Event sent from the [`libp2p_swarm::NetworkBehaviour`] to the [`SubstreamConnectionHandler`].
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
pub enum InEvent<I, TInboundEvent, TOutboundEvent> {
/// Open a new substream using the provided `open_info`.
///
/// For "client-server" protocols, this is typically the initial message to be sent to the other party.
NewSubstream { open_info: I },
NotifyInboundSubstream {
id: InboundSubstreamId,
message: TInboundEvent,
},
NotifyOutboundSubstream {
id: OutboundSubstreamId,
message: TOutboundEvent,
},
}
/// Event produced by the [`SubstreamConnectionHandler`] for the corresponding [`libp2p_swarm::NetworkBehaviour`].
#[derive(Debug)]
pub enum OutEvent<TInbound, TOutbound, TInboundError, TOutboundError> {
/// An inbound substream produced an event.
InboundEvent {
id: InboundSubstreamId,
message: TInbound,
},
/// An outbound substream produced an event.
OutboundEvent {
id: OutboundSubstreamId,
message: TOutbound,
},
/// An inbound substream errored irrecoverably.
InboundError {
id: InboundSubstreamId,
error: TInboundError,
},
/// An outbound substream errored irrecoverably.
OutboundError {
id: OutboundSubstreamId,
error: TOutboundError,
},
}
impl<
TInboundInEvent,
TInboundOutEvent,
TOutboundInEvent,
TOutboundOutEvent,
TOutboundOpenInfo,
TInboundError,
TOutboundError,
TInboundSubstreamHandler,
TOutboundSubstreamHandler,
> ConnectionHandler
for SubstreamConnectionHandler<
TInboundSubstreamHandler,
TOutboundSubstreamHandler,
TOutboundOpenInfo,
>
where
TInboundSubstreamHandler: SubstreamHandler<
InEvent = TInboundInEvent,
OutEvent = TInboundOutEvent,
Error = TInboundError,
OpenInfo = (),
>,
TOutboundSubstreamHandler: SubstreamHandler<
InEvent = TOutboundInEvent,
OutEvent = TOutboundOutEvent,
Error = TOutboundError,
OpenInfo = TOutboundOpenInfo,
>,
TInboundInEvent: fmt::Debug + Send + 'static,
TInboundOutEvent: fmt::Debug + Send + 'static,
TOutboundInEvent: fmt::Debug + Send + 'static,
TOutboundOutEvent: fmt::Debug + Send + 'static,
TOutboundOpenInfo: fmt::Debug + Send + 'static,
TInboundError: fmt::Debug + Send + 'static,
TOutboundError: fmt::Debug + Send + 'static,
TInboundSubstreamHandler: Send + 'static,
TOutboundSubstreamHandler: Send + 'static,
{
type FromBehaviour = InEvent<TOutboundOpenInfo, TInboundInEvent, TOutboundInEvent>;
type ToBehaviour = OutEvent<TInboundOutEvent, TOutboundOutEvent, TInboundError, TOutboundError>;
type Error = Void;
type InboundProtocol = PassthroughProtocol;
type OutboundProtocol = PassthroughProtocol;
type InboundOpenInfo = ();
type OutboundOpenInfo = TOutboundOpenInfo;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
TInboundSubstreamHandler::upgrade(())
}
fn on_connection_event(
&mut self,
event: ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol, ..
}) => {
self.inbound_substreams.insert(
self.next_inbound_substream_id.fetch_and_increment(),
TInboundSubstreamHandler::new(protocol, ()),
);
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) => {
self.outbound_substreams.insert(
self.next_outbound_substream_id.fetch_and_increment(),
TOutboundSubstreamHandler::new(protocol, info),
);
}
// TODO: Handle upgrade errors properly
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info),
InEvent::NotifyInboundSubstream { id, message } => {
match self.inbound_substreams.remove(&id) {
Some(handler) => {
let new_handler = handler.on_event(message);
self.inbound_substreams.insert(id, new_handler);
}
None => {
log::debug!("Substream with ID {} not found", id);
}
}
}
InEvent::NotifyOutboundSubstream { id, message } => {
match self.outbound_substreams.remove(&id) {
Some(handler) => {
let new_handler = handler.on_event(message);
self.outbound_substreams.insert(id, new_handler);
}
None => {
log::debug!("Substream with ID {} not found", id);
}
}
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
// Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols.
if Instant::now() < self.initial_keep_alive_deadline {
return KeepAlive::Yes;
}
if self.inbound_substreams.is_empty()
&& self.outbound_substreams.is_empty()
&& self.new_substreams.is_empty()
{
return KeepAlive::No;
}
KeepAlive::Yes
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::ToBehaviour,
Self::Error,
>,
> {
if let Some(open_info) = self.new_substreams.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: TOutboundSubstreamHandler::upgrade(open_info),
});
}
match poll_substreams(&mut self.inbound_substreams, cx) {
Poll::Ready(Ok((id, message))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
OutEvent::InboundEvent { id, message },
))
}
Poll::Ready(Err((id, error))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
OutEvent::InboundError { id, error },
))
}
Poll::Pending => {}
}
match poll_substreams(&mut self.outbound_substreams, cx) {
Poll::Ready(Ok((id, message))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
OutEvent::OutboundEvent { id, message },
))
}
Poll::Ready(Err((id, error))) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
OutEvent::OutboundError { id, error },
))
}
Poll::Pending => {}
}
Poll::Pending
}
}
/// A helper struct for substream handlers that can be implemented as async functions.
///
/// This only works for substreams without an `InEvent` because - once constructed - the state of an inner future is opaque.
pub(crate) struct FutureSubstream<TOutEvent, TError> {
future: Fuse<BoxFuture<'static, Result<TOutEvent, TError>>>,
}
impl<TOutEvent, TError> FutureSubstream<TOutEvent, TError> {
pub(crate) fn new(
future: impl Future<Output = Result<TOutEvent, TError>> + Send + 'static,
) -> Self {
Self {
future: future.boxed().fuse(),
}
}
pub(crate) fn advance(mut self, cx: &mut Context<'_>) -> Result<Next<Self, TOutEvent>, TError> {
if self.future.is_terminated() {
return Ok(Next::Done);
}
match self.future.poll_unpin(cx) {
Poll::Ready(Ok(event)) => Ok(Next::EmitEvent {
event,
next_state: self,
}),
Poll::Ready(Err(error)) => Err(error),
Poll::Pending => Ok(Next::Pending { next_state: self }),
}
}
}
impl SubstreamHandler for void::Void {
type InEvent = void::Void;
type OutEvent = void::Void;
type Error = void::Void;
type OpenInfo = ();
fn new(_: Stream, _: Self::OpenInfo) -> Self {
unreachable!("we should never yield a substream")
}
fn on_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}
fn advance(self, _: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
void::unreachable(self)
}
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol { ident: None }, open_info)
}
}