refactor(relay): revise public API to follow naming convention (#3238)

Continues addressing https://github.com/libp2p/rust-libp2p/issues/2217.
This commit is contained in:
João Oliveira
2023-01-02 20:21:46 +00:00
committed by GitHub
parent 68d0f882bd
commit 9c96bbb54b
24 changed files with 394 additions and 308 deletions

View File

@ -151,9 +151,9 @@ impl super::Recorder<libp2p_identify::Event> for Metrics {
#[cfg(feature = "ping")] #[cfg(feature = "ping")]
libp2p_ping::PROTOCOL_NAME, libp2p_ping::PROTOCOL_NAME,
#[cfg(feature = "relay")] #[cfg(feature = "relay")]
libp2p_relay::v2::STOP_PROTOCOL_NAME, libp2p_relay::STOP_PROTOCOL_NAME,
#[cfg(feature = "relay")] #[cfg(feature = "relay")]
libp2p_relay::v2::HOP_PROTOCOL_NAME, libp2p_relay::HOP_PROTOCOL_NAME,
]; ];
allowed_protocols.contains(&p.as_bytes()) allowed_protocols.contains(&p.as_bytes())

View File

@ -136,8 +136,8 @@ impl Recorder<libp2p_ping::Event> for Metrics {
} }
#[cfg(feature = "relay")] #[cfg(feature = "relay")]
impl Recorder<libp2p_relay::v2::relay::Event> for Metrics { impl Recorder<libp2p_relay::Event> for Metrics {
fn record(&self, event: &libp2p_relay::v2::relay::Event) { fn record(&self, event: &libp2p_relay::Event) {
self.relay.record(event) self.relay.record(event)
} }
} }

View File

@ -63,47 +63,35 @@ enum EventType {
CircuitClosed, CircuitClosed,
} }
impl From<&libp2p_relay::v2::relay::Event> for EventType { impl From<&libp2p_relay::Event> for EventType {
fn from(event: &libp2p_relay::v2::relay::Event) -> Self { fn from(event: &libp2p_relay::Event) -> Self {
match event { match event {
libp2p_relay::v2::relay::Event::ReservationReqAccepted { .. } => { libp2p_relay::Event::ReservationReqAccepted { .. } => EventType::ReservationReqAccepted,
EventType::ReservationReqAccepted libp2p_relay::Event::ReservationReqAcceptFailed { .. } => {
}
libp2p_relay::v2::relay::Event::ReservationReqAcceptFailed { .. } => {
EventType::ReservationReqAcceptFailed EventType::ReservationReqAcceptFailed
} }
libp2p_relay::v2::relay::Event::ReservationReqDenied { .. } => { libp2p_relay::Event::ReservationReqDenied { .. } => EventType::ReservationReqDenied,
EventType::ReservationReqDenied libp2p_relay::Event::ReservationReqDenyFailed { .. } => {
}
libp2p_relay::v2::relay::Event::ReservationReqDenyFailed { .. } => {
EventType::ReservationReqDenyFailed EventType::ReservationReqDenyFailed
} }
libp2p_relay::v2::relay::Event::ReservationTimedOut { .. } => { libp2p_relay::Event::ReservationTimedOut { .. } => EventType::ReservationTimedOut,
EventType::ReservationTimedOut libp2p_relay::Event::CircuitReqReceiveFailed { .. } => {
}
libp2p_relay::v2::relay::Event::CircuitReqReceiveFailed { .. } => {
EventType::CircuitReqReceiveFailed EventType::CircuitReqReceiveFailed
} }
libp2p_relay::v2::relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied, libp2p_relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied,
libp2p_relay::v2::relay::Event::CircuitReqOutboundConnectFailed { .. } => { libp2p_relay::Event::CircuitReqOutboundConnectFailed { .. } => {
EventType::CircuitReqOutboundConnectFailed EventType::CircuitReqOutboundConnectFailed
} }
libp2p_relay::v2::relay::Event::CircuitReqDenyFailed { .. } => { libp2p_relay::Event::CircuitReqDenyFailed { .. } => EventType::CircuitReqDenyFailed,
EventType::CircuitReqDenyFailed libp2p_relay::Event::CircuitReqAccepted { .. } => EventType::CircuitReqAccepted,
} libp2p_relay::Event::CircuitReqAcceptFailed { .. } => EventType::CircuitReqAcceptFailed,
libp2p_relay::v2::relay::Event::CircuitReqAccepted { .. } => { libp2p_relay::Event::CircuitClosed { .. } => EventType::CircuitClosed,
EventType::CircuitReqAccepted
}
libp2p_relay::v2::relay::Event::CircuitReqAcceptFailed { .. } => {
EventType::CircuitReqAcceptFailed
}
libp2p_relay::v2::relay::Event::CircuitClosed { .. } => EventType::CircuitClosed,
} }
} }
} }
impl super::Recorder<libp2p_relay::v2::relay::Event> for Metrics { impl super::Recorder<libp2p_relay::Event> for Metrics {
fn record(&self, event: &libp2p_relay::v2::relay::Event) { fn record(&self, event: &libp2p_relay::Event) {
self.events self.events
.get_or_create(&EventLabels { .get_or_create(&EventLabels {
event: event.into(), event: event.into(),

View File

@ -32,7 +32,7 @@ use libp2p_dns::DnsConfig;
use libp2p_identify as identify; use libp2p_identify as identify;
use libp2p_noise as noise; use libp2p_noise as noise;
use libp2p_ping as ping; use libp2p_ping as ping;
use libp2p_relay::v2::client::{self, Client}; use libp2p_relay as relay;
use libp2p_swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; use libp2p_swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
use libp2p_tcp as tcp; use libp2p_tcp as tcp;
use log::info; use log::info;
@ -87,7 +87,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
info!("Local peer id: {:?}", local_peer_id); info!("Local peer id: {:?}", local_peer_id);
let (relay_transport, client) = Client::new_transport_and_behaviour(local_peer_id); let (relay_transport, client) = relay::client::new(local_peer_id);
let transport = OrTransport::new( let transport = OrTransport::new(
relay_transport, relay_transport,
@ -111,7 +111,7 @@ fn main() -> Result<(), Box<dyn Error>> {
prelude = "libp2p_swarm::derive_prelude" prelude = "libp2p_swarm::derive_prelude"
)] )]
struct Behaviour { struct Behaviour {
relay_client: Client, relay_client: relay::client::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
identify: identify::Behaviour, identify: identify::Behaviour,
dcutr: dcutr::Behaviour, dcutr: dcutr::Behaviour,
@ -122,7 +122,7 @@ fn main() -> Result<(), Box<dyn Error>> {
enum Event { enum Event {
Ping(ping::Event), Ping(ping::Event),
Identify(identify::Event), Identify(identify::Event),
Relay(client::Event), Relay(relay::client::Event),
Dcutr(dcutr::Event), Dcutr(dcutr::Event),
} }
@ -138,8 +138,8 @@ fn main() -> Result<(), Box<dyn Error>> {
} }
} }
impl From<client::Event> for Event { impl From<relay::client::Event> for Event {
fn from(e: client::Event) -> Self { fn from(e: relay::client::Event) -> Self {
Event::Relay(e) Event::Relay(e)
} }
} }
@ -252,9 +252,9 @@ fn main() -> Result<(), Box<dyn Error>> {
SwarmEvent::NewListenAddr { address, .. } => { SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {:?}", address); info!("Listening on {:?}", address);
} }
SwarmEvent::Behaviour(Event::Relay(client::Event::ReservationReqAccepted { SwarmEvent::Behaviour(Event::Relay(
.. relay::client::Event::ReservationReqAccepted { .. },
})) => { )) => {
assert!(opts.mode == Mode::Listen); assert!(opts.mode == Mode::Listen);
info!("Relay accepted our reservation request."); info!("Relay accepted our reservation request.");
} }

View File

@ -31,8 +31,7 @@ use libp2p_core::PublicKey;
use libp2p_core::{identity, PeerId}; use libp2p_core::{identity, PeerId};
use libp2p_dcutr as dcutr; use libp2p_dcutr as dcutr;
use libp2p_plaintext::PlainText2Config; use libp2p_plaintext::PlainText2Config;
use libp2p_relay::v2::client; use libp2p_relay as relay;
use libp2p_relay::v2::relay;
use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent};
use std::time::Duration; use std::time::Duration;
@ -91,7 +90,7 @@ fn connect() {
)); ));
} }
fn build_relay() -> Swarm<relay::Relay> { fn build_relay() -> Swarm<relay::Behaviour> {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public(); let local_public_key = local_key.public();
let local_peer_id = local_public_key.to_peer_id(); let local_peer_id = local_public_key.to_peer_id();
@ -100,7 +99,7 @@ fn build_relay() -> Swarm<relay::Relay> {
Swarm::with_threadpool_executor( Swarm::with_threadpool_executor(
transport, transport,
relay::Relay::new( relay::Behaviour::new(
local_peer_id, local_peer_id,
relay::Config { relay::Config {
reservation_duration: Duration::from_secs(2), reservation_duration: Duration::from_secs(2),
@ -116,7 +115,7 @@ fn build_client() -> Swarm<Client> {
let local_public_key = local_key.public(); let local_public_key = local_key.public();
let local_peer_id = local_public_key.to_peer_id(); let local_peer_id = local_public_key.to_peer_id();
let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour(local_peer_id); let (relay_transport, behaviour) = relay::client::new(local_peer_id);
let transport = build_transport( let transport = build_transport(
OrTransport::new(relay_transport, MemoryTransport::default()).boxed(), OrTransport::new(relay_transport, MemoryTransport::default()).boxed(),
local_public_key, local_public_key,
@ -153,18 +152,18 @@ where
prelude = "libp2p_swarm::derive_prelude" prelude = "libp2p_swarm::derive_prelude"
)] )]
struct Client { struct Client {
relay: client::Client, relay: relay::client::Behaviour,
dcutr: dcutr::Behaviour, dcutr: dcutr::Behaviour,
} }
#[derive(Debug)] #[derive(Debug)]
enum ClientEvent { enum ClientEvent {
Relay(client::Event), Relay(relay::client::Event),
Dcutr(dcutr::Event), Dcutr(dcutr::Event),
} }
impl From<client::Event> for ClientEvent { impl From<relay::client::Event> for ClientEvent {
fn from(event: client::Event) -> Self { fn from(event: relay::client::Event) -> Self {
ClientEvent::Relay(event) ClientEvent::Relay(event)
} }
} }
@ -198,11 +197,13 @@ async fn wait_for_reservation(
break; break;
} }
} }
SwarmEvent::Behaviour(ClientEvent::Relay(client::Event::ReservationReqAccepted { SwarmEvent::Behaviour(ClientEvent::Relay(
relay_peer_id: peer_id, relay::client::Event::ReservationReqAccepted {
renewal, relay_peer_id: peer_id,
.. renewal,
})) if relay_peer_id == peer_id && renewal == is_renewal => { ..
},
)) if relay_peer_id == peer_id && renewal == is_renewal => {
reservation_req_accepted = true; reservation_req_accepted = true;
if new_listen_addr_for_relayed_addr { if new_listen_addr_for_relayed_addr {
break; break;
@ -226,7 +227,7 @@ async fn wait_for_connection_established(client: &mut Swarm<Client>, addr: &Mult
} }
SwarmEvent::Dialing(_) => {} SwarmEvent::Dialing(_) => {}
SwarmEvent::Behaviour(ClientEvent::Relay( SwarmEvent::Behaviour(ClientEvent::Relay(
client::Event::OutboundCircuitEstablished { .. }, relay::client::Event::OutboundCircuitEstablished { .. },
)) => {} )) => {}
SwarmEvent::ConnectionEstablished { .. } => {} SwarmEvent::ConnectionEstablished { .. } => {}
e => panic!("{e:?}"), e => panic!("{e:?}"),

View File

@ -1,9 +1,19 @@
# 0.15.0 [unreleased] # 0.15.0 [unreleased]
- Rename types as per [discussion 2174].
`Relay` has been renamed to `Behaviour`.
The `Relay`, and `Client` prefixes have been removed from various types like `ClientTransport`.
the `v2` namespace has also been removed, users should prefer importing the relay protocol as a module (`use libp2p::relay;`),
and refer to its types via `relay::`. For example: `relay::Behaviour` or `relay::client::Behaviour`.
See [PR 3238].
- Update to `libp2p-core` `v0.39.0`. - Update to `libp2p-core` `v0.39.0`.
- Update to `libp2p-swarm` `v0.42.0`. - Update to `libp2p-swarm` `v0.42.0`.
[PR 3238]: https://github.com/libp2p/rust-libp2p/pull/3238
[discussion 2174]: https://github.com/libp2p/rust-libp2p/issues/2174
# 0.14.0 # 0.14.0
- Update to `prost-codec` `v0.3.0`. - Update to `prost-codec` `v0.3.0`.

View File

@ -19,5 +19,5 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
fn main() { fn main() {
prost_build::compile_protos(&["src/v2/message.proto"], &["src/v2"]).unwrap(); prost_build::compile_protos(&["src/message.proto"], &["src/"]).unwrap();
} }

View File

@ -28,7 +28,7 @@ use libp2p_core::{identity, Multiaddr, PeerId, Transport};
use libp2p_identify as identify; use libp2p_identify as identify;
use libp2p_noise as noise; use libp2p_noise as noise;
use libp2p_ping as ping; use libp2p_ping as ping;
use libp2p_relay::v2::relay::{self, Relay}; use libp2p_relay as relay;
use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use libp2p_tcp as tcp; use libp2p_tcp as tcp;
use std::error::Error; use std::error::Error;
@ -57,7 +57,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.boxed(); .boxed();
let behaviour = Behaviour { let behaviour = Behaviour {
relay: Relay::new(local_peer_id, Default::default()), relay: relay::Behaviour::new(local_peer_id, Default::default()),
ping: ping::Behaviour::new(ping::Config::new()), ping: ping::Behaviour::new(ping::Config::new()),
identify: identify::Behaviour::new(identify::Config::new( identify: identify::Behaviour::new(identify::Config::new(
"/TODO/0.0.1".to_string(), "/TODO/0.0.1".to_string(),
@ -79,7 +79,7 @@ fn main() -> Result<(), Box<dyn Error>> {
block_on(async { block_on(async {
loop { loop {
match swarm.next().await.expect("Infinite Stream.") { match swarm.next().await.expect("Infinite Stream.") {
SwarmEvent::Behaviour(Event::Relay(event)) => { SwarmEvent::Behaviour(event) => {
println!("{event:?}") println!("{event:?}")
} }
SwarmEvent::NewListenAddr { address, .. } => { SwarmEvent::NewListenAddr { address, .. } => {
@ -92,42 +92,13 @@ fn main() -> Result<(), Box<dyn Error>> {
} }
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour( #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
out_event = "Event",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct Behaviour { struct Behaviour {
relay: Relay, relay: relay::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
identify: identify::Behaviour, identify: identify::Behaviour,
} }
#[derive(Debug)]
enum Event {
Ping(ping::Event),
Identify(identify::Event),
Relay(relay::Event),
}
impl From<ping::Event> for Event {
fn from(e: ping::Event) -> Self {
Event::Ping(e)
}
}
impl From<identify::Event> for Event {
fn from(e: identify::Event) -> Self {
Event::Identify(e)
}
}
impl From<relay::Event> for Event {
fn from(e: relay::Event) -> Self {
Event::Relay(e)
}
}
fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair { fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair {
let mut bytes = [0u8; 32]; let mut bytes = [0u8; 32];
bytes[0] = secret_key_seed; bytes[0] = secret_key_seed;

View File

@ -23,8 +23,8 @@
mod handler; mod handler;
pub mod rate_limiter; pub mod rate_limiter;
use crate::v2::message_proto; use crate::message_proto;
use crate::v2::protocol::inbound_hop; use crate::protocol::{inbound_hop, outbound_stop};
use either::Either; use either::Either;
use instant::Instant; use instant::Instant;
use libp2p_core::connection::ConnectionId; use libp2p_core::connection::ConnectionId;
@ -41,9 +41,7 @@ use std::ops::Add;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use super::protocol::outbound_stop; /// Configuration for the relay [`Behaviour`].
/// Configuration for the [`Relay`] [`NetworkBehaviour`].
/// ///
/// # Panics /// # Panics
/// ///
@ -126,7 +124,7 @@ impl Default for Config {
} }
} }
/// The events produced by the [`Relay`] behaviour. /// The events produced by the relay `Behaviour`.
#[derive(Debug)] #[derive(Debug)]
pub enum Event { pub enum Event {
/// An inbound reservation request has been accepted. /// An inbound reservation request has been accepted.
@ -189,9 +187,9 @@ pub enum Event {
}, },
} }
/// [`Relay`] is a [`NetworkBehaviour`] that implements the relay server /// [`NetworkBehaviour`] implementation of the relay server
/// functionality of the circuit relay v2 protocol. /// functionality of the circuit relay v2 protocol.
pub struct Relay { pub struct Behaviour {
config: Config, config: Config,
local_peer_id: PeerId, local_peer_id: PeerId,
@ -205,7 +203,7 @@ pub struct Relay {
external_addresses: ExternalAddresses, external_addresses: ExternalAddresses,
} }
impl Relay { impl Behaviour {
pub fn new(local_peer_id: PeerId, config: Config) -> Self { pub fn new(local_peer_id: PeerId, config: Config) -> Self {
Self { Self {
config, config,
@ -251,7 +249,7 @@ impl Relay {
} }
} }
impl NetworkBehaviour for Relay { impl NetworkBehaviour for Behaviour {
type ConnectionHandler = handler::Prototype; type ConnectionHandler = handler::Prototype;
type OutEvent = Event; type OutEvent = Event;
@ -745,7 +743,7 @@ impl Add<u64> for CircuitId {
} }
/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] /// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`]
/// before being returned in [`Relay::poll`]. /// before being returned in [`Behaviour::poll`].
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Action { enum Action {
Done(NetworkBehaviourAction<Event, handler::Prototype>), Done(NetworkBehaviourAction<Event, handler::Prototype>),

View File

@ -18,10 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::copy_future::CopyFuture; use crate::behaviour::CircuitId;
use crate::v2::message_proto::Status; use crate::copy_future::CopyFuture;
use crate::v2::protocol::{inbound_hop, outbound_stop}; use crate::message_proto::Status;
use crate::v2::relay::CircuitId; use crate::protocol::{inbound_hop, outbound_stop};
use bytes::Bytes; use bytes::Bytes;
use either::Either; use either::Either;
use futures::channel::oneshot::{self, Canceled}; use futures::channel::oneshot::{self, Canceled};

View File

@ -23,9 +23,62 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod behaviour;
mod copy_future;
mod priv_client;
mod protocol;
pub mod v2; pub mod v2;
#[allow(clippy::derive_partial_eq_without_eq)]
mod message_proto {
include!(concat!(env!("OUT_DIR"), "/message_v2.pb.rs"));
}
pub use behaviour::{Behaviour, CircuitId, Config, Event};
pub use protocol::{HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME};
/// Types related to the relay protocol inbound.
pub mod inbound {
pub mod hop {
pub use crate::protocol::inbound_hop::FatalUpgradeError;
}
pub mod stop {
pub use crate::protocol::inbound_stop::FatalUpgradeError;
}
}
/// Types related to the relay protocol outbound.
pub mod outbound {
pub mod hop {
pub use crate::protocol::outbound_hop::FatalUpgradeError;
}
pub mod stop {
pub use crate::protocol::outbound_stop::FatalUpgradeError;
}
}
/// Everything related to the relay protocol from a client's perspective.
pub mod client {
pub use crate::priv_client::{new, transport::Transport, Behaviour, Connection, Event};
pub mod transport {
pub use crate::priv_client::transport::Error;
}
}
// Check that we can safely cast a `usize` to a `u64`. // Check that we can safely cast a `usize` to a `u64`.
static_assertions::const_assert! { static_assertions::const_assert! {
std::mem::size_of::<usize>() <= std::mem::size_of::<u64>() std::mem::size_of::<usize>() <= std::mem::size_of::<u64>()
} }
/// The ID of an outgoing / incoming, relay / destination request.
// TODO remove this type and it's usage on `TransportToBehaviourMsg::DialReq`
// when removed from `v2`.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct RequestId(u64);
impl RequestId {
fn new() -> RequestId {
RequestId(rand::random())
}
}

View File

@ -21,9 +21,9 @@
//! [`NetworkBehaviour`] to act as a circuit relay v2 **client**. //! [`NetworkBehaviour`] to act as a circuit relay v2 **client**.
mod handler; mod handler;
pub mod transport; pub(crate) mod transport;
use crate::v2::protocol::{self, inbound_stop, outbound_hop}; use crate::protocol::{self, inbound_stop, outbound_hop};
use bytes::Bytes; use bytes::Bytes;
use either::Either; use either::Either;
use futures::channel::mpsc::Receiver; use futures::channel::mpsc::Receiver;
@ -45,8 +45,9 @@ use std::io::{Error, ErrorKind, IoSlice};
use std::ops::DerefMut; use std::ops::DerefMut;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use transport::Transport;
/// The events produced by the [`Client`] behaviour. /// The events produced by the client `Behaviour`.
#[derive(Debug)] #[derive(Debug)]
pub enum Event { pub enum Event {
/// An outbound reservation has been accepted. /// An outbound reservation has been accepted.
@ -88,7 +89,9 @@ pub enum Event {
}, },
} }
pub struct Client { /// [`NetworkBehaviour`] implementation of the relay client
/// functionality of the circuit relay v2 protocol.
pub struct Behaviour {
local_peer_id: PeerId, local_peer_id: PeerId,
from_transport: Receiver<transport::TransportToBehaviourMsg>, from_transport: Receiver<transport::TransportToBehaviourMsg>,
@ -100,18 +103,22 @@ pub struct Client {
queued_actions: VecDeque<Event>, queued_actions: VecDeque<Event>,
} }
impl Client { /// Create a new client relay [`Behaviour`] with it's corresponding [`Transport`].
pub fn new_transport_and_behaviour( pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
local_peer_id: PeerId, let (transport, from_transport) = Transport::new();
) -> (transport::ClientTransport, Self) { let behaviour = Behaviour {
let (transport, from_transport) = transport::ClientTransport::new(); local_peer_id,
let behaviour = Client { from_transport,
local_peer_id, directly_connected_peers: Default::default(),
from_transport, queued_actions: Default::default(),
directly_connected_peers: Default::default(), };
queued_actions: Default::default(), (transport, behaviour)
}; }
(transport, behaviour)
impl Behaviour {
#[deprecated(since = "0.15.0", note = "Use libp2p_relay::client::new instead.")]
pub fn new_transport_and_behaviour(local_peer_id: PeerId) -> (transport::Transport, Self) {
new(local_peer_id)
} }
fn on_connection_closed( fn on_connection_closed(
@ -145,7 +152,7 @@ impl Client {
} }
} }
impl NetworkBehaviour for Client { impl NetworkBehaviour for Behaviour {
type ConnectionHandler = handler::Prototype; type ConnectionHandler = handler::Prototype;
type OutEvent = Event; type OutEvent = Event;
@ -322,10 +329,10 @@ impl NetworkBehaviour for Client {
} }
} }
None => unreachable!( None => unreachable!(
"`Relay` `NetworkBehaviour` polled after channel from \ "`relay::Behaviour` polled after channel from \
`RelayTransport` has been closed. Unreachable under \ `Transport` has been closed. Unreachable under \
the assumption that the `Client` is never polled after \ the assumption that the `client::Behaviour` is never polled after \
`ClientTransport` is dropped.", `client::Transport` is dropped.",
), ),
}; };
@ -333,10 +340,10 @@ impl NetworkBehaviour for Client {
} }
} }
/// A [`NegotiatedSubstream`] acting as a [`RelayedConnection`]. /// A [`NegotiatedSubstream`] acting as a [`Connection`].
pub enum RelayedConnection { pub enum Connection {
InboundAccepting { InboundAccepting {
accept: BoxFuture<'static, Result<RelayedConnection, Error>>, accept: BoxFuture<'static, Result<Connection, Error>>,
}, },
Operational { Operational {
read_buffer: Bytes, read_buffer: Bytes,
@ -345,20 +352,20 @@ pub enum RelayedConnection {
}, },
} }
impl Unpin for RelayedConnection {} impl Unpin for Connection {}
impl RelayedConnection { impl Connection {
pub(crate) fn new_inbound( pub(crate) fn new_inbound(
circuit: inbound_stop::Circuit, circuit: inbound_stop::Circuit,
drop_notifier: oneshot::Sender<void::Void>, drop_notifier: oneshot::Sender<void::Void>,
) -> Self { ) -> Self {
RelayedConnection::InboundAccepting { Connection::InboundAccepting {
accept: async { accept: async {
let (substream, read_buffer) = circuit let (substream, read_buffer) = circuit
.accept() .accept()
.await .await
.map_err(|e| Error::new(ErrorKind::Other, e))?; .map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(RelayedConnection::Operational { Ok(Connection::Operational {
read_buffer, read_buffer,
substream, substream,
drop_notifier, drop_notifier,
@ -373,7 +380,7 @@ impl RelayedConnection {
read_buffer: Bytes, read_buffer: Bytes,
drop_notifier: oneshot::Sender<void::Void>, drop_notifier: oneshot::Sender<void::Void>,
) -> Self { ) -> Self {
RelayedConnection::Operational { Connection::Operational {
substream, substream,
read_buffer, read_buffer,
drop_notifier, drop_notifier,
@ -381,7 +388,7 @@ impl RelayedConnection {
} }
} }
impl AsyncWrite for RelayedConnection { impl AsyncWrite for Connection {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context, cx: &mut Context,
@ -389,10 +396,10 @@ impl AsyncWrite for RelayedConnection {
) -> Poll<Result<usize, Error>> { ) -> Poll<Result<usize, Error>> {
loop { loop {
match self.deref_mut() { match self.deref_mut() {
RelayedConnection::InboundAccepting { accept } => { Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = ready!(accept.poll_unpin(cx))?;
} }
RelayedConnection::Operational { substream, .. } => { Connection::Operational { substream, .. } => {
return Pin::new(substream).poll_write(cx, buf); return Pin::new(substream).poll_write(cx, buf);
} }
} }
@ -401,10 +408,10 @@ impl AsyncWrite for RelayedConnection {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
loop { loop {
match self.deref_mut() { match self.deref_mut() {
RelayedConnection::InboundAccepting { accept } => { Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = ready!(accept.poll_unpin(cx))?;
} }
RelayedConnection::Operational { substream, .. } => { Connection::Operational { substream, .. } => {
return Pin::new(substream).poll_flush(cx); return Pin::new(substream).poll_flush(cx);
} }
} }
@ -413,10 +420,10 @@ impl AsyncWrite for RelayedConnection {
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> { fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
loop { loop {
match self.deref_mut() { match self.deref_mut() {
RelayedConnection::InboundAccepting { accept } => { Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = ready!(accept.poll_unpin(cx))?;
} }
RelayedConnection::Operational { substream, .. } => { Connection::Operational { substream, .. } => {
return Pin::new(substream).poll_close(cx); return Pin::new(substream).poll_close(cx);
} }
} }
@ -430,10 +437,10 @@ impl AsyncWrite for RelayedConnection {
) -> Poll<Result<usize, Error>> { ) -> Poll<Result<usize, Error>> {
loop { loop {
match self.deref_mut() { match self.deref_mut() {
RelayedConnection::InboundAccepting { accept } => { Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = ready!(accept.poll_unpin(cx))?;
} }
RelayedConnection::Operational { substream, .. } => { Connection::Operational { substream, .. } => {
return Pin::new(substream).poll_write_vectored(cx, bufs); return Pin::new(substream).poll_write_vectored(cx, bufs);
} }
} }
@ -441,7 +448,7 @@ impl AsyncWrite for RelayedConnection {
} }
} }
impl AsyncRead for RelayedConnection { impl AsyncRead for Connection {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -449,10 +456,10 @@ impl AsyncRead for RelayedConnection {
) -> Poll<Result<usize, Error>> { ) -> Poll<Result<usize, Error>> {
loop { loop {
match self.deref_mut() { match self.deref_mut() {
RelayedConnection::InboundAccepting { accept } => { Connection::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = ready!(accept.poll_unpin(cx))?;
} }
RelayedConnection::Operational { Connection::Operational {
read_buffer, read_buffer,
substream, substream,
.. ..

View File

@ -18,9 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::client::transport; use crate::message_proto::Status;
use crate::v2::message_proto::Status; use crate::priv_client::transport;
use crate::v2::protocol::{self, inbound_stop, outbound_hop}; use crate::protocol::{self, inbound_stop, outbound_hop};
use either::Either; use either::Either;
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::future::{BoxFuture, FutureExt}; use futures::future::{BoxFuture, FutureExt};
@ -56,7 +56,7 @@ pub enum In {
}, },
EstablishCircuit { EstablishCircuit {
dst_peer_id: PeerId, dst_peer_id: PeerId,
send_back: oneshot::Sender<Result<super::RelayedConnection, ()>>, send_back: oneshot::Sender<Result<super::Connection, ()>>,
}, },
} }
@ -112,7 +112,7 @@ pub enum Event {
pub struct Prototype { pub struct Prototype {
local_peer_id: PeerId, local_peer_id: PeerId,
/// Initial [`In`] event from [`super::Client`] provided at creation time. /// Initial [`In`] event from [`super::Behaviour`] provided at creation time.
initial_in: Option<In>, initial_in: Option<In>,
} }
@ -231,7 +231,7 @@ impl Handler {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx); self.alive_lend_out_substreams.push(rx);
let connection = super::RelayedConnection::new_inbound(inbound_circuit, tx); let connection = super::Connection::new_inbound(inbound_circuit, tx);
pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection { pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection {
stream: connection, stream: connection,
@ -316,7 +316,7 @@ impl Handler {
OutboundOpenInfo::Connect { send_back }, OutboundOpenInfo::Connect { send_back },
) => { ) => {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
match send_back.send(Ok(super::RelayedConnection::new_outbound( match send_back.send(Ok(super::Connection::new_outbound(
substream, substream,
read_buffer, read_buffer,
tx, tx,
@ -328,7 +328,7 @@ impl Handler {
)); ));
} }
Err(_) => debug!( Err(_) => debug!(
"Oneshot to `RelayedDial` future dropped. \ "Oneshot to `client::transport::Dial` future dropped. \
Dropping established relayed connection to {:?}.", Dropping established relayed connection to {:?}.",
self.remote_peer_id, self.remote_peer_id,
), ),
@ -795,6 +795,6 @@ pub enum OutboundOpenInfo {
to_listener: mpsc::Sender<transport::ToListenerMsg>, to_listener: mpsc::Sender<transport::ToListenerMsg>,
}, },
Connect { Connect {
send_back: oneshot::Sender<Result<super::RelayedConnection, ()>>, send_back: oneshot::Sender<Result<super::Connection, ()>>,
}, },
} }

View File

@ -19,8 +19,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::client::RelayedConnection; use crate::priv_client::Connection;
use crate::v2::RequestId; use crate::RequestId;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::future::{ready, BoxFuture, FutureExt, Ready}; use futures::future::{ready, BoxFuture, FutureExt, Ready};
@ -30,7 +30,7 @@ use futures::stream::SelectAll;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{ListenerId, TransportError, TransportEvent}; use libp2p_core::transport::{ListenerId, TransportError, TransportEvent};
use libp2p_core::{PeerId, Transport}; use libp2p_core::PeerId;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -50,9 +50,9 @@ use thiserror::Error;
/// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport, PeerId}; /// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport, PeerId};
/// # use libp2p_core::transport::memory::MemoryTransport; /// # use libp2p_core::transport::memory::MemoryTransport;
/// # use libp2p_core::transport::choice::OrTransport; /// # use libp2p_core::transport::choice::OrTransport;
/// # use libp2p_relay::v2::client; /// # use libp2p_relay as relay;
/// let actual_transport = MemoryTransport::default(); /// let actual_transport = MemoryTransport::default();
/// let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour( /// let (relay_transport, behaviour) = relay::client::new(
/// PeerId::random(), /// PeerId::random(),
/// ); /// );
/// let mut transport = OrTransport::new(relay_transport, actual_transport); /// let mut transport = OrTransport::new(relay_transport, actual_transport);
@ -72,11 +72,11 @@ use thiserror::Error;
/// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport, PeerId}; /// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport, PeerId};
/// # use libp2p_core::transport::memory::MemoryTransport; /// # use libp2p_core::transport::memory::MemoryTransport;
/// # use libp2p_core::transport::choice::OrTransport; /// # use libp2p_core::transport::choice::OrTransport;
/// # use libp2p_relay::v2::client; /// # use libp2p_relay as relay;
/// # let relay_id = PeerId::random(); /// # let relay_id = PeerId::random();
/// # let local_peer_id = PeerId::random(); /// # let local_peer_id = PeerId::random();
/// let actual_transport = MemoryTransport::default(); /// let actual_transport = MemoryTransport::default();
/// let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour( /// let (relay_transport, behaviour) = relay::client::new(
/// local_peer_id, /// local_peer_id,
/// ); /// );
/// let mut transport = OrTransport::new(relay_transport, actual_transport); /// let mut transport = OrTransport::new(relay_transport, actual_transport);
@ -86,35 +86,16 @@ use thiserror::Error;
/// .with(Protocol::P2pCircuit); // Signal to listen via remote relay node. /// .with(Protocol::P2pCircuit); // Signal to listen via remote relay node.
/// transport.listen_on(relay_addr).unwrap(); /// transport.listen_on(relay_addr).unwrap();
/// ``` /// ```
pub struct ClientTransport { pub struct Transport {
to_behaviour: mpsc::Sender<TransportToBehaviourMsg>, to_behaviour: mpsc::Sender<TransportToBehaviourMsg>,
pending_to_behaviour: VecDeque<TransportToBehaviourMsg>, pending_to_behaviour: VecDeque<TransportToBehaviourMsg>,
listeners: SelectAll<RelayListener>, listeners: SelectAll<Listener>,
} }
impl ClientTransport { impl Transport {
/// Create a new [`ClientTransport`].
///
/// Note: The transport only handles listening and dialing on relayed [`Multiaddr`], and depends on
/// an other transport to do the actual transmission of data. They should be combined through the
/// [`OrTransport`](libp2p_core::transport::choice::OrTransport).
///
/// ```
/// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport, PeerId};
/// # use libp2p_core::transport::memory::MemoryTransport;
/// # use libp2p_core::transport::choice::OrTransport;
/// # use libp2p_relay::v2::client;
/// let actual_transport = MemoryTransport::default();
/// let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour(
/// PeerId::random(),
/// );
///
/// // To reduce unnecessary connection attempts, put `relay_transport` first.
/// let mut transport = OrTransport::new(relay_transport, actual_transport);
/// ```
pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) { pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) {
let (to_behaviour, from_transport) = mpsc::channel(0); let (to_behaviour, from_transport) = mpsc::channel(0);
let transport = ClientTransport { let transport = Transport {
to_behaviour, to_behaviour,
pending_to_behaviour: VecDeque::new(), pending_to_behaviour: VecDeque::new(),
listeners: SelectAll::new(), listeners: SelectAll::new(),
@ -123,11 +104,11 @@ impl ClientTransport {
} }
} }
impl Transport for ClientTransport { impl libp2p_core::Transport for Transport {
type Output = RelayedConnection; type Output = Connection;
type Error = RelayError; type Error = Error;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>; type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = RelayedDial; type Dial = BoxFuture<'static, Result<Connection, Error>>;
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> { fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? { let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? {
@ -135,12 +116,12 @@ impl Transport for ClientTransport {
relay_peer_id: None, relay_peer_id: None,
relay_addr: _, relay_addr: _,
.. ..
} => return Err(RelayError::MissingDstPeerId.into()), } => return Err(Error::MissingDstPeerId.into()),
RelayedMultiaddr { RelayedMultiaddr {
relay_peer_id: _, relay_peer_id: _,
relay_addr: None, relay_addr: None,
.. ..
} => return Err(RelayError::MissingRelayAddr.into()), } => return Err(Error::MissingRelayAddr.into()),
RelayedMultiaddr { RelayedMultiaddr {
relay_peer_id: Some(peer_id), relay_peer_id: Some(peer_id),
relay_addr: Some(addr), relay_addr: Some(addr),
@ -157,7 +138,7 @@ impl Transport for ClientTransport {
}); });
let listener_id = ListenerId::new(); let listener_id = ListenerId::new();
let listener = RelayListener { let listener = Listener {
listener_id, listener_id,
queued_events: Default::default(), queued_events: Default::default(),
from_behaviour, from_behaviour,
@ -185,9 +166,9 @@ impl Transport for ClientTransport {
} = parse_relayed_multiaddr(addr)?; } = parse_relayed_multiaddr(addr)?;
// TODO: In the future we might want to support dialing a relay by its address only. // TODO: In the future we might want to support dialing a relay by its address only.
let relay_peer_id = relay_peer_id.ok_or(RelayError::MissingRelayPeerId)?; let relay_peer_id = relay_peer_id.ok_or(Error::MissingRelayPeerId)?;
let relay_addr = relay_addr.ok_or(RelayError::MissingRelayAddr)?; let relay_addr = relay_addr.ok_or(Error::MissingRelayAddr)?;
let dst_peer_id = dst_peer_id.ok_or(RelayError::MissingDstPeerId)?; let dst_peer_id = dst_peer_id.ok_or(Error::MissingDstPeerId)?;
let mut to_behaviour = self.to_behaviour.clone(); let mut to_behaviour = self.to_behaviour.clone();
Ok(async move { Ok(async move {
@ -202,7 +183,7 @@ impl Transport for ClientTransport {
send_back: tx, send_back: tx,
}) })
.await?; .await?;
let stream = rx.await?.map_err(|()| RelayError::Connect)?; let stream = rx.await?.map_err(|()| Error::Connect)?;
Ok(stream) Ok(stream)
} }
.boxed()) .boxed())
@ -265,9 +246,7 @@ struct RelayedMultiaddr {
} }
/// Parse a [`Multiaddr`] containing a [`Protocol::P2pCircuit`]. /// Parse a [`Multiaddr`] containing a [`Protocol::P2pCircuit`].
fn parse_relayed_multiaddr( fn parse_relayed_multiaddr(addr: Multiaddr) -> Result<RelayedMultiaddr, TransportError<Error>> {
addr: Multiaddr,
) -> Result<RelayedMultiaddr, TransportError<RelayError>> {
if !addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) { if !addr.iter().any(|p| matches!(p, Protocol::P2pCircuit)) {
return Err(TransportError::MultiaddrNotSupported(addr)); return Err(TransportError::MultiaddrNotSupported(addr));
} }
@ -281,20 +260,20 @@ fn parse_relayed_multiaddr(
if before_circuit { if before_circuit {
before_circuit = false; before_circuit = false;
} else { } else {
return Err(RelayError::MultipleCircuitRelayProtocolsUnsupported.into()); return Err(Error::MultipleCircuitRelayProtocolsUnsupported.into());
} }
} }
Protocol::P2p(hash) => { Protocol::P2p(hash) => {
let peer_id = PeerId::from_multihash(hash).map_err(|_| RelayError::InvalidHash)?; let peer_id = PeerId::from_multihash(hash).map_err(|_| Error::InvalidHash)?;
if before_circuit { if before_circuit {
if relayed_multiaddr.relay_peer_id.is_some() { if relayed_multiaddr.relay_peer_id.is_some() {
return Err(RelayError::MalformedMultiaddr.into()); return Err(Error::MalformedMultiaddr.into());
} }
relayed_multiaddr.relay_peer_id = Some(peer_id) relayed_multiaddr.relay_peer_id = Some(peer_id)
} else { } else {
if relayed_multiaddr.dst_peer_id.is_some() { if relayed_multiaddr.dst_peer_id.is_some() {
return Err(RelayError::MalformedMultiaddr.into()); return Err(Error::MalformedMultiaddr.into());
} }
relayed_multiaddr.dst_peer_id = Some(peer_id) relayed_multiaddr.dst_peer_id = Some(peer_id)
} }
@ -318,24 +297,24 @@ fn parse_relayed_multiaddr(
Ok(relayed_multiaddr) Ok(relayed_multiaddr)
} }
pub struct RelayListener { pub struct Listener {
listener_id: ListenerId, listener_id: ListenerId,
/// Queue of events to report when polled. /// Queue of events to report when polled.
queued_events: VecDeque<<Self as Stream>::Item>, queued_events: VecDeque<<Self as Stream>::Item>,
/// Channel for messages from the behaviour [`Handler`][super::handler::Handler]. /// Channel for messages from the behaviour [`Handler`][super::handler::Handler].
from_behaviour: mpsc::Receiver<ToListenerMsg>, from_behaviour: mpsc::Receiver<ToListenerMsg>,
/// The listener can be closed either manually with [`Transport::remove_listener`] or if /// The listener can be closed either manually with [`Transport::remove_listener`](libp2p_core::Transport) or if
/// the sender side of the `from_behaviour` channel is dropped. /// the sender side of the `from_behaviour` channel is dropped.
is_closed: bool, is_closed: bool,
} }
impl RelayListener { impl Listener {
/// Close the listener. /// Close the listener.
/// ///
/// This will create a [`TransportEvent::ListenerClosed`] event /// This will create a [`TransportEvent::ListenerClosed`] event
/// and terminate the stream once all remaining events in queue have /// and terminate the stream once all remaining events in queue have
/// been reported. /// been reported.
fn close(&mut self, reason: Result<(), RelayError>) { fn close(&mut self, reason: Result<(), Error>) {
self.queued_events self.queued_events
.push_back(TransportEvent::ListenerClosed { .push_back(TransportEvent::ListenerClosed {
listener_id: self.listener_id, listener_id: self.listener_id,
@ -345,8 +324,8 @@ impl RelayListener {
} }
} }
impl Stream for RelayListener { impl Stream for Listener {
type Item = TransportEvent<<ClientTransport as Transport>::ListenerUpgrade, RelayError>; type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { loop {
@ -398,17 +377,15 @@ impl Stream for RelayListener {
send_back_addr: Protocol::P2p(src_peer_id.into()).into(), send_back_addr: Protocol::P2p(src_peer_id.into()).into(),
}) })
} }
ToListenerMsg::Reservation(Err(())) => self.close(Err(RelayError::Reservation)), ToListenerMsg::Reservation(Err(())) => self.close(Err(Error::Reservation)),
}; };
} }
} }
} }
pub type RelayedDial = BoxFuture<'static, Result<RelayedConnection, RelayError>>;
/// Error that occurred during relay connection setup. /// Error that occurred during relay connection setup.
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RelayError { pub enum Error {
#[error("Missing relay peer id.")] #[error("Missing relay peer id.")]
MissingRelayPeerId, MissingRelayPeerId,
#[error("Missing relay address.")] #[error("Missing relay address.")]
@ -433,13 +410,13 @@ pub enum RelayError {
Connect, Connect,
} }
impl From<RelayError> for TransportError<RelayError> { impl From<Error> for TransportError<Error> {
fn from(error: RelayError) -> Self { fn from(error: Error) -> Self {
TransportError::Other(error) TransportError::Other(error)
} }
} }
/// Message from the [`ClientTransport`] to the [`Relay`](crate::v2::relay::Relay) /// Message from the [`Transport`] to the [`Behaviour`](crate::Behaviour)
/// [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour). /// [`NetworkBehaviour`](libp2p_swarm::NetworkBehaviour).
pub enum TransportToBehaviourMsg { pub enum TransportToBehaviourMsg {
/// Dial destination node via relay node. /// Dial destination node via relay node.
@ -449,7 +426,7 @@ pub enum TransportToBehaviourMsg {
relay_peer_id: PeerId, relay_peer_id: PeerId,
dst_addr: Option<Multiaddr>, dst_addr: Option<Multiaddr>,
dst_peer_id: PeerId, dst_peer_id: PeerId,
send_back: oneshot::Sender<Result<RelayedConnection, ()>>, send_back: oneshot::Sender<Result<Connection, ()>>,
}, },
/// Listen for incoming relayed connections via relay node. /// Listen for incoming relayed connections via relay node.
ListenReq { ListenReq {
@ -463,7 +440,7 @@ pub enum TransportToBehaviourMsg {
pub enum ToListenerMsg { pub enum ToListenerMsg {
Reservation(Result<Reservation, ()>), Reservation(Result<Reservation, ()>),
IncomingRelayedConnection { IncomingRelayedConnection {
stream: RelayedConnection, stream: Connection,
src_peer_id: PeerId, src_peer_id: PeerId,
relay_peer_id: PeerId, relay_peer_id: PeerId,
relay_addr: Multiaddr, relay_addr: Multiaddr,

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::message_proto; use crate::message_proto;
use std::time::Duration; use std::time::Duration;
pub mod inbound_hop; pub mod inbound_hop;

View File

@ -18,8 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::message_proto::{hop_message, HopMessage, Limit, Reservation, Status}; use crate::message_proto::{hop_message, HopMessage, Limit, Reservation, Status};
use crate::v2::protocol::{HOP_PROTOCOL_NAME, MAX_MESSAGE_SIZE}; use crate::protocol::{HOP_PROTOCOL_NAME, MAX_MESSAGE_SIZE};
use asynchronous_codec::{Framed, FramedParts}; use asynchronous_codec::{Framed, FramedParts};
use bytes::Bytes; use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};

View File

@ -18,8 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::message_proto::{stop_message, Status, StopMessage}; use crate::message_proto::{stop_message, Status, StopMessage};
use crate::v2::protocol::{self, MAX_MESSAGE_SIZE, STOP_PROTOCOL_NAME}; use crate::protocol::{self, MAX_MESSAGE_SIZE, STOP_PROTOCOL_NAME};
use asynchronous_codec::{Framed, FramedParts}; use asynchronous_codec::{Framed, FramedParts};
use bytes::Bytes; use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};

View File

@ -18,8 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::message_proto::{hop_message, HopMessage, Peer, Status}; use crate::message_proto::{hop_message, HopMessage, Peer, Status};
use crate::v2::protocol::{Limit, HOP_PROTOCOL_NAME, MAX_MESSAGE_SIZE}; use crate::protocol::{Limit, HOP_PROTOCOL_NAME, MAX_MESSAGE_SIZE};
use asynchronous_codec::{Framed, FramedParts}; use asynchronous_codec::{Framed, FramedParts};
use bytes::Bytes; use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};

View File

@ -18,8 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::v2::message_proto::{stop_message, Limit, Peer, Status, StopMessage}; use crate::message_proto::{stop_message, Limit, Peer, Status, StopMessage};
use crate::v2::protocol::{MAX_MESSAGE_SIZE, STOP_PROTOCOL_NAME}; use crate::protocol::{MAX_MESSAGE_SIZE, STOP_PROTOCOL_NAME};
use asynchronous_codec::{Framed, FramedParts}; use asynchronous_codec::{Framed, FramedParts};
use bytes::Bytes; use bytes::Bytes;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};

View File

@ -21,30 +21,154 @@
//! Implementation of the [libp2p circuit relay v2 //! Implementation of the [libp2p circuit relay v2
//! specification](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md). //! specification](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md).
#[allow(clippy::derive_partial_eq_without_eq)] pub mod client {
mod message_proto { #[deprecated(since = "0.15.0", note = "Use libp2p_relay::client::Event instead.")]
include!(concat!(env!("OUT_DIR"), "/message_v2.pb.rs")); pub type Event = crate::client::Event;
}
pub mod client; #[deprecated(
mod copy_future; since = "0.15.0",
mod protocol; note = "Use libp2p_relay::client::Behaviour instead."
pub mod relay; )]
pub type Client = crate::client::Behaviour;
pub use protocol::{ #[deprecated(
inbound_hop::FatalUpgradeError as InboundHopFatalUpgradeError, since = "0.15.0",
inbound_stop::FatalUpgradeError as InboundStopFatalUpgradeError, note = "Use libp2p_relay::client::Connection instead."
outbound_hop::FatalUpgradeError as OutboundHopFatalUpgradeError, )]
outbound_stop::FatalUpgradeError as OutboundStopFatalUpgradeError, HOP_PROTOCOL_NAME, pub type RelayedConnection = crate::client::Connection;
STOP_PROTOCOL_NAME,
};
/// The ID of an outgoing / incoming, relay / destination request. pub mod transport {
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] use futures::future::BoxFuture;
pub struct RequestId(u64);
impl RequestId { #[deprecated(
fn new() -> RequestId { since = "0.15.0",
RequestId(rand::random()) note = "Use libp2p_relay::client::Transport instead."
)]
pub type ClientTransport = crate::client::Transport;
#[deprecated(
since = "0.15.0",
note = "RelayListener will become crate-private in the future
as it shouldn't be required by end users."
)]
pub type RelayListener = crate::priv_client::transport::Listener;
#[deprecated(
since = "0.15.0",
note = "RelayedDial type alias will be deprecated,
users should create the alias themselves if needed."
)]
pub type RelayedDial = BoxFuture<
'static,
Result<crate::client::Connection, crate::priv_client::transport::Error>,
>;
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::client::transport::Error instead."
)]
pub type RelayError = crate::client::transport::Error;
#[deprecated(
since = "0.15.0",
note = "TransportToBehaviourMsg will become crate-private in the future
as it shouldn't be required by end users."
)]
pub type TransportToBehaviourMsg = crate::priv_client::transport::TransportToBehaviourMsg;
#[deprecated(
since = "0.15.0",
note = "ToListenerMsg will become crate-private in the future
as it shouldn't be required by end users."
)]
pub type ToListenerMsg = crate::priv_client::transport::ToListenerMsg;
#[deprecated(
since = "0.15.0",
note = "Reservation will become crate-private in the future
as it shouldn't be required by end users."
)]
pub type Reservation = crate::priv_client::transport::Reservation;
} }
} }
pub mod relay {
#[deprecated(since = "0.15.0", note = "Use libp2p_relay::Config instead.")]
pub type Config = crate::Config;
#[deprecated(since = "0.15.0", note = "Use libp2p_relay::Event instead.")]
pub type Event = crate::Event;
#[deprecated(since = "0.15.0", note = "Use libp2p_relay::Behaviour instead.")]
pub type Relay = crate::Behaviour;
#[deprecated(since = "0.15.0", note = "Use libp2p_relay::CircuitId instead.")]
pub type CircuitId = crate::CircuitId;
pub mod rate_limiter {
use instant::Instant;
use libp2p_core::{Multiaddr, PeerId};
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::behaviour::rate_limiter::RateLimiter instead."
)]
pub trait RateLimiter: Send {
fn try_next(&mut self, peer: PeerId, addr: &Multiaddr, now: Instant) -> bool;
}
#[allow(deprecated)]
impl<T> RateLimiter for T
where
T: crate::behaviour::rate_limiter::RateLimiter,
{
fn try_next(&mut self, peer: PeerId, addr: &Multiaddr, now: Instant) -> bool {
self.try_next(peer, addr, now)
}
}
}
}
pub mod protocol {
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::inbound::hop::FatalUpgradeError instead."
)]
pub type InboundHopFatalUpgradeError = crate::inbound::hop::FatalUpgradeError;
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::inbound::stop::FatalUpgradeError instead."
)]
pub type InboundStopFatalUpgradeError = crate::inbound::stop::FatalUpgradeError;
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::outbound::hop::FatalUpgradeError instead."
)]
pub type OutboundHopFatalUpgradeError = crate::outbound::hop::FatalUpgradeError;
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::outbound::stop::FatalUpgradeError instead."
)]
pub type OutboundStopFatalUpgradeError = crate::outbound::stop::FatalUpgradeError;
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::HOP_PROTOCOL_NAME instead."
)]
pub const HOP_PROTOCOL_NAME: &[u8; 31] = crate::HOP_PROTOCOL_NAME;
#[deprecated(
since = "0.15.0",
note = "Use libp2p_relay::STOP_PROTOCOL_NAME instead."
)]
pub const STOP_PROTOCOL_NAME: &[u8; 32] = crate::STOP_PROTOCOL_NAME;
}
#[deprecated(
since = "0.15.0",
note = "RequestId will be deprecated as it isn't used"
)]
pub type RequestId = super::RequestId;

View File

@ -31,8 +31,7 @@ use libp2p_core::PublicKey;
use libp2p_core::{identity, upgrade, PeerId}; use libp2p_core::{identity, upgrade, PeerId};
use libp2p_ping as ping; use libp2p_ping as ping;
use libp2p_plaintext::PlainText2Config; use libp2p_plaintext::PlainText2Config;
use libp2p_relay::v2::client; use libp2p_relay as relay;
use libp2p_relay::v2::relay;
use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent};
use std::time::Duration; use std::time::Duration;
@ -141,7 +140,7 @@ fn new_reservation_to_same_relay_replaces_old() {
} }
} }
SwarmEvent::Behaviour(ClientEvent::Relay( SwarmEvent::Behaviour(ClientEvent::Relay(
client::Event::ReservationReqAccepted { relay::client::Event::ReservationReqAccepted {
relay_peer_id: peer_id, relay_peer_id: peer_id,
.. ..
}, },
@ -219,7 +218,7 @@ fn connect() {
break break
} }
SwarmEvent::Behaviour(ClientEvent::Relay( SwarmEvent::Behaviour(ClientEvent::Relay(
client::Event::OutboundCircuitEstablished { .. }, relay::client::Event::OutboundCircuitEstablished { .. },
)) => {} )) => {}
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. }))
if peer == relay_peer_id => {} if peer == relay_peer_id => {}
@ -295,7 +294,7 @@ fn build_relay() -> Swarm<Relay> {
transport, transport,
Relay { Relay {
ping: ping::Behaviour::new(ping::Config::new()), ping: ping::Behaviour::new(ping::Config::new()),
relay: relay::Relay::new( relay: relay::Behaviour::new(
local_peer_id, local_peer_id,
relay::Config { relay::Config {
reservation_duration: Duration::from_secs(2), reservation_duration: Duration::from_secs(2),
@ -312,7 +311,7 @@ fn build_client() -> Swarm<Client> {
let local_public_key = local_key.public(); let local_public_key = local_key.public();
let local_peer_id = local_public_key.to_peer_id(); let local_peer_id = local_public_key.to_peer_id();
let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour(local_peer_id); let (relay_transport, behaviour) = relay::client::new(local_peer_id);
let transport = upgrade_transport( let transport = upgrade_transport(
OrTransport::new(relay_transport, MemoryTransport::default()).boxed(), OrTransport::new(relay_transport, MemoryTransport::default()).boxed(),
local_public_key, local_public_key,
@ -343,63 +342,19 @@ where
} }
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour( #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
out_event = "RelayEvent",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct Relay { struct Relay {
relay: relay::Relay, relay: relay::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
} }
#[derive(Debug)]
enum RelayEvent {
Relay(relay::Event),
Ping(ping::Event),
}
impl From<relay::Event> for RelayEvent {
fn from(event: relay::Event) -> Self {
RelayEvent::Relay(event)
}
}
impl From<ping::Event> for RelayEvent {
fn from(event: ping::Event) -> Self {
RelayEvent::Ping(event)
}
}
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour( #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
out_event = "ClientEvent",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct Client { struct Client {
relay: client::Client, relay: relay::client::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
} }
#[derive(Debug)]
enum ClientEvent {
Relay(client::Event),
Ping(ping::Event),
}
impl From<client::Event> for ClientEvent {
fn from(event: client::Event) -> Self {
ClientEvent::Relay(event)
}
}
impl From<ping::Event> for ClientEvent {
fn from(event: ping::Event) -> Self {
ClientEvent::Ping(event)
}
}
fn spawn_swarm_on_pool<B: NetworkBehaviour + Send>(pool: &LocalPool, swarm: Swarm<B>) { fn spawn_swarm_on_pool<B: NetworkBehaviour + Send>(pool: &LocalPool, swarm: Swarm<B>) {
pool.spawner() pool.spawner()
.spawn_obj(swarm.collect::<Vec<_>>().map(|_| ()).boxed().into()) .spawn_obj(swarm.collect::<Vec<_>>().map(|_| ()).boxed().into())
@ -417,11 +372,13 @@ async fn wait_for_reservation(
loop { loop {
match client.select_next_some().await { match client.select_next_some().await {
SwarmEvent::Behaviour(ClientEvent::Relay(client::Event::ReservationReqAccepted { SwarmEvent::Behaviour(ClientEvent::Relay(
relay_peer_id: peer_id, relay::client::Event::ReservationReqAccepted {
renewal, relay_peer_id: peer_id,
.. renewal,
})) if relay_peer_id == peer_id && renewal == is_renewal => { ..
},
)) if relay_peer_id == peer_id && renewal == is_renewal => {
reservation_req_accepted = true; reservation_req_accepted = true;
if new_listen_addr { if new_listen_addr {
break; break;