protocols/: Add basic AutoNAT implementation (#2262)

This commit adds a behaviour protocol that implements the AutoNAT specification.
It enables users to detect whether they are behind a NAT. The Autonat Protocol
implements a Codec for the Request-Response protocol, and wraps it in a new
Network Behaviour with some additional functionality.

Co-authored-by: David Craven <david@craven.ch>
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Elena Frank 2022-01-14 10:27:28 +01:00 committed by GitHub
parent ddc035170d
commit c61ea6ad29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 2970 additions and 0 deletions

View File

@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"]
[features]
default = [
"autonat",
"deflate",
"dns-async-std",
"floodsub",
@ -34,6 +35,7 @@ default = [
"websocket",
"yamux",
]
autonat = ["libp2p-autonat"]
deflate = ["libp2p-deflate"]
dns-async-std = ["libp2p-dns", "libp2p-dns/async-std"]
dns-tokio = ["libp2p-dns", "libp2p-dns/tokio"]
@ -73,6 +75,8 @@ futures-timer = "3.0.2" # Explicit dependency to be used in `wasm-bindgen` featu
getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"
libp2p-autonat = { version = "0.20.0", path = "protocols/autonat", optional = true }
libp2p-core = { version = "0.31.0", path = "core", default-features = false }
libp2p-floodsub = { version = "0.33.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.35.0", path = "./protocols/gossipsub", optional = true }
@ -120,6 +124,7 @@ members = [
"misc/peer-id-generator",
"muxers/mplex",
"muxers/yamux",
"protocols/autonat",
"protocols/floodsub",
"protocols/gossipsub",
"protocols/rendezvous",

View File

@ -0,0 +1,36 @@
[package]
name = "libp2p-autonat"
edition = "2021"
rust-version = "1.56.1"
version = "0.20.0"
authors = ["David Craven <david@craven.ch>", "Elena Frank <elena.frank@protonmail.com>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[build-dependencies]
prost-build = "0.6"
[dependencies]
async-trait = "0.1"
futures = "0.3"
futures-timer = "3.0"
instant = "0.1"
libp2p-core = { version = "0.31.0", path = "../../core", default-features = false }
libp2p-swarm = { version = "0.33.0", path = "../../swarm" }
libp2p-request-response = { version = "0.15.0", path = "../request-response" }
log = "0.4"
rand = "0.8"
prost = "0.8"
[dev-dependencies]
async-std = { version = "1.10", features = ["attributes"] }
env_logger = "0.9"
structopt = "0.3"
[dev-dependencies.libp2p]
path = "../../"
default-features = false
features = ["autonat", "dns-async-std", "identify", "mplex", "noise", "tcp-async-io", "websocket", "yamux"]

View File

@ -0,0 +1,23 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
fn main() {
prost_build::compile_protos(&["src/structs.proto"], &["src"]).unwrap();
}

View File

@ -0,0 +1,135 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Basic example that combines the AutoNAT and identify protocols.
//!
//! The identify protocol informs the local peer of its external addresses, that are then send in AutoNAT dial-back
//! requests to the server.
//!
//! To run this example, follow the instructions in `examples/server` to start a server, then run in a new terminal:
//! ```sh
//! cargo run --example client -- --server-address <server-addr> --server-peer-id <server-peer-id> --listen_port <port>
//! ```
//! The `listen_port` parameter is optional and allows to set a fixed port at which the local client should listen.
use futures::prelude::*;
use libp2p::autonat;
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId};
use std::error::Error;
use std::net::Ipv4Addr;
use std::time::Duration;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "libp2p autonat")]
struct Opt {
#[structopt(long)]
listen_port: Option<u16>,
#[structopt(long)]
server_address: Multiaddr,
#[structopt(long)]
server_peer_id: PeerId,
}
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let opt = Opt::from_args();
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);
let transport = libp2p::development_transport(local_key.clone()).await?;
let behaviour = Behaviour::new(local_key.public());
let mut swarm = Swarm::new(transport, behaviour, local_peer_id);
swarm.listen_on(
Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
.with(Protocol::Tcp(opt.listen_port.unwrap_or(0))),
)?;
swarm
.behaviour_mut()
.auto_nat
.add_server(opt.server_peer_id, Some(opt.server_address));
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address),
SwarmEvent::Behaviour(event) => println!("{:?}", event),
e => println!("{:?}", e),
}
}
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
struct Behaviour {
identify: Identify,
auto_nat: autonat::Behaviour,
}
impl Behaviour {
fn new(local_public_key: identity::PublicKey) -> Self {
Self {
identify: Identify::new(IdentifyConfig::new(
"/ipfs/0.1.0".into(),
local_public_key.clone(),
)),
auto_nat: autonat::Behaviour::new(
local_public_key.to_peer_id(),
autonat::Config {
retry_interval: Duration::from_secs(10),
refresh_interval: Duration::from_secs(30),
boot_delay: Duration::from_secs(5),
throttle_server_period: Duration::ZERO,
..Default::default()
},
),
}
}
}
#[derive(Debug)]
enum Event {
AutoNat(autonat::Event),
Identify(IdentifyEvent),
}
impl From<IdentifyEvent> for Event {
fn from(v: IdentifyEvent) -> Self {
Self::Identify(v)
}
}
impl From<autonat::Event> for Event {
fn from(v: autonat::Event) -> Self {
Self::AutoNat(v)
}
}

View File

@ -0,0 +1,114 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Basic example for a AutoNAT server that supports the /libp2p/autonat/1.0.0 and "/ipfs/0.1.0" protocols.
//!
//! To start the server run:
//! ```sh
//! cargo run --example server -- --listen_port <port>
//! ```
//! The `listen_port` parameter is optional and allows to set a fixed port at which the local peer should listen.
use futures::prelude::*;
use libp2p::autonat;
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId};
use std::error::Error;
use std::net::Ipv4Addr;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "libp2p autonat")]
struct Opt {
#[structopt(long)]
listen_port: Option<u16>,
}
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let opt = Opt::from_args();
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);
let transport = libp2p::development_transport(local_key.clone()).await?;
let behaviour = Behaviour::new(local_key.public());
let mut swarm = Swarm::new(transport, behaviour, local_peer_id);
swarm.listen_on(
Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
.with(Protocol::Tcp(opt.listen_port.unwrap_or(0))),
)?;
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address),
SwarmEvent::Behaviour(event) => println!("{:?}", event),
e => println!("{:?}", e),
}
}
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
struct Behaviour {
identify: Identify,
auto_nat: autonat::Behaviour,
}
impl Behaviour {
fn new(local_public_key: identity::PublicKey) -> Self {
Self {
identify: Identify::new(IdentifyConfig::new(
"/ipfs/0.1.0".into(),
local_public_key.clone(),
)),
auto_nat: autonat::Behaviour::new(
local_public_key.to_peer_id(),
autonat::Config::default(),
),
}
}
}
#[derive(Debug)]
enum Event {
AutoNat(autonat::Event),
Identify(IdentifyEvent),
}
impl From<IdentifyEvent> for Event {
fn from(v: IdentifyEvent) -> Self {
Self::Identify(v)
}
}
impl From<autonat::Event> for Event {
fn from(v: autonat::Event) -> Self {
Self::AutoNat(v)
}
}

View File

@ -0,0 +1,501 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod as_client;
mod as_server;
use crate::protocol::{AutoNatCodec, AutoNatProtocol, DialRequest, DialResponse, ResponseError};
use as_client::AsClient;
pub use as_client::{OutboundProbeError, OutboundProbeEvent};
use as_server::AsServer;
pub use as_server::{InboundProbeError, InboundProbeEvent};
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::{
connection::{ConnectionId, ListenerId},
ConnectedPoint, Multiaddr, PeerId,
};
use libp2p_request_response::{
handler::RequestResponseHandlerEvent, ProtocolSupport, RequestId, RequestResponse,
RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
};
use libp2p_swarm::{
DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use std::{
collections::{HashMap, VecDeque},
iter,
task::{Context, Poll},
time::Duration,
};
/// Config for the [`Behaviour`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Config {
/// Timeout for requests.
pub timeout: Duration,
// Client Config
/// Delay on init before starting the fist probe.
pub boot_delay: Duration,
/// Interval in which the NAT should be tested again if max confidence was reached in a status.
pub refresh_interval: Duration,
/// Interval in which the NAT status should be re-tried if it is currently unknown
/// or max confidence was not reached yet.
pub retry_interval: Duration,
/// Throttle period for re-using a peer as server for a dial-request.
pub throttle_server_period: Duration,
/// Use connected peers as servers for probes.
pub use_connected: bool,
/// Max confidence that can be reached in a public / private NAT status.
/// Note: for [`NatStatus::Unknown`] the confidence is always 0.
pub confidence_max: usize,
// Server Config
/// Max addresses that are tried per peer.
pub max_peer_addresses: usize,
/// Max total dial requests done in `[Config::throttle_clients_period`].
pub throttle_clients_global_max: usize,
/// Max dial requests done in `[Config::throttle_clients_period`] for a peer.
pub throttle_clients_peer_max: usize,
/// Period for throttling clients requests.
pub throttle_clients_period: Duration,
}
impl Default for Config {
fn default() -> Self {
Config {
timeout: Duration::from_secs(30),
boot_delay: Duration::from_secs(15),
retry_interval: Duration::from_secs(90),
refresh_interval: Duration::from_secs(15 * 60),
throttle_server_period: Duration::from_secs(90),
use_connected: true,
confidence_max: 3,
max_peer_addresses: 16,
throttle_clients_global_max: 30,
throttle_clients_peer_max: 3,
throttle_clients_period: Duration::from_secs(1),
}
}
}
/// Assumed NAT status.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NatStatus {
Public(Multiaddr),
Private,
Unknown,
}
impl NatStatus {
pub fn is_public(&self) -> bool {
matches!(self, NatStatus::Public(..))
}
}
/// Unique identifier for a probe.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ProbeId(usize);
impl ProbeId {
fn next(&mut self) -> ProbeId {
let current = *self;
self.0 += 1;
current
}
}
/// Event produced by [`Behaviour`].
#[derive(Debug, Clone, PartialEq)]
pub enum Event {
/// Event on an inbound probe.
InboundProbe(InboundProbeEvent),
/// Event on an outbound probe.
OutboundProbe(OutboundProbeEvent),
/// The assumed NAT changed.
StatusChanged {
/// Former status.
old: NatStatus,
/// New status.
new: NatStatus,
},
}
/// [`NetworkBehaviour`] for AutoNAT.
///
/// The behaviour frequently runs probes to determine whether the local peer is behind NAT and/ or a firewall, or
/// publicly reachable.
/// In a probe, a dial-back request is sent to a peer that is randomly selected from the list of fixed servers and
/// connected peers. Upon receiving a dial-back request, the remote tries to dial the included addresses. When a
/// first address was successfully dialed, a status Ok will be send back together with the dialed address. If no address
/// can be reached a dial-error is send back.
/// Based on the received response, the sender assumes themselves to be public or private.
/// The status is retried in a frequency of [`Config::retry_interval`] or [`Config::retry_interval`], depending on whether
/// enough confidence in the assumed NAT status was reached or not.
/// The confidence increases each time a probe confirms the assumed status, and decreases if a different status is reported.
/// If the confidence is 0, the status is flipped and the Behaviour will report the new status in an `OutEvent`.
pub struct Behaviour {
// Local peer id
local_peer_id: PeerId,
// Inner behaviour for sending requests and receiving the response.
inner: RequestResponse<AutoNatCodec>,
config: Config,
// Additional peers apart from the currently connected ones, that may be used for probes.
servers: Vec<PeerId>,
// Assumed NAT status.
nat_status: NatStatus,
// Confidence in the assumed NAT status.
confidence: usize,
// Timer for the next probe.
schedule_probe: Delay,
// Ongoing inbound requests, where no response has been sent back to the remote yet.
ongoing_inbound: HashMap<
PeerId,
(
ProbeId,
RequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
>,
// Ongoing outbound probes and mapped to the inner request id.
ongoing_outbound: HashMap<RequestId, ProbeId>,
// Connected peers with the observed address of each connection.
// If the endpoint of a connection is relayed, the observed address is `None`.
connected: HashMap<PeerId, HashMap<ConnectionId, Option<Multiaddr>>>,
// Used servers in recent outbound probes that are throttled through Config::throttle_server_period.
throttled_servers: Vec<(PeerId, Instant)>,
// Recent probes done for clients
throttled_clients: Vec<(PeerId, Instant)>,
last_probe: Option<Instant>,
pending_out_events: VecDeque<<Self as NetworkBehaviour>::OutEvent>,
probe_id: ProbeId,
}
impl Behaviour {
pub fn new(local_peer_id: PeerId, config: Config) -> Self {
let protocols = iter::once((AutoNatProtocol, ProtocolSupport::Full));
let mut cfg = RequestResponseConfig::default();
cfg.set_request_timeout(config.timeout);
let inner = RequestResponse::new(AutoNatCodec, protocols, cfg);
Self {
local_peer_id,
inner,
schedule_probe: Delay::new(config.boot_delay),
config,
servers: Vec::new(),
ongoing_inbound: HashMap::default(),
ongoing_outbound: HashMap::default(),
connected: HashMap::default(),
nat_status: NatStatus::Unknown,
confidence: 0,
throttled_servers: Vec::new(),
throttled_clients: Vec::new(),
last_probe: None,
pending_out_events: VecDeque::new(),
probe_id: ProbeId(0),
}
}
/// Assumed public address of the local peer.
/// Returns `None` in case of status [`NatStatus::Private`] or [`NatStatus::Unknown`].
pub fn public_address(&self) -> Option<&Multiaddr> {
match &self.nat_status {
NatStatus::Public(address) => Some(address),
_ => None,
}
}
/// Assumed NAT status.
pub fn nat_status(&self) -> NatStatus {
self.nat_status.clone()
}
/// Confidence in the assumed NAT status.
pub fn confidence(&self) -> usize {
self.confidence
}
/// Add a peer to the list over servers that may be used for probes.
/// These peers are used for dial-request even if they are currently not connection, in which case a connection will be
/// establish before sending the dial-request.
pub fn add_server(&mut self, peer: PeerId, address: Option<Multiaddr>) {
self.servers.push(peer);
if let Some(addr) = address {
self.inner.add_address(&peer, addr);
}
}
/// Remove a peer from the list of servers.
/// See [`Behaviour::add_server`] for more info.
pub fn remove_server(&mut self, peer: &PeerId) {
self.servers.retain(|p| p != peer);
}
fn as_client(&mut self) -> AsClient {
AsClient {
inner: &mut self.inner,
local_peer_id: self.local_peer_id,
config: &self.config,
connected: &self.connected,
probe_id: &mut self.probe_id,
servers: &self.servers,
throttled_servers: &mut self.throttled_servers,
nat_status: &mut self.nat_status,
confidence: &mut self.confidence,
ongoing_outbound: &mut self.ongoing_outbound,
last_probe: &mut self.last_probe,
schedule_probe: &mut self.schedule_probe,
}
}
fn as_server(&mut self) -> AsServer {
AsServer {
inner: &mut self.inner,
config: &self.config,
connected: &self.connected,
probe_id: &mut self.probe_id,
throttled_clients: &mut self.throttled_clients,
ongoing_inbound: &mut self.ongoing_inbound,
}
}
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler = <RequestResponse<AutoNatCodec> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = Event;
fn inject_connection_established(
&mut self,
peer: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
) {
self.inner
.inject_connection_established(peer, conn, endpoint, failed_addresses);
let connections = self.connected.entry(*peer).or_default();
let addr = if endpoint.is_relayed() {
None
} else {
Some(endpoint.get_remote_address().clone())
};
connections.insert(*conn, addr);
match endpoint {
ConnectedPoint::Dialer { address } => {
if let Some(event) = self.as_server().on_outbound_connection(peer, address) {
self.pending_out_events
.push_back(Event::InboundProbe(event));
}
}
ConnectedPoint::Listener { .. } => self.as_client().on_inbound_connection(),
}
}
fn inject_connection_closed(
&mut self,
peer: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
self.inner
.inject_connection_closed(peer, conn, endpoint, handler);
let connections = self.connected.get_mut(peer).expect("Peer is connected.");
connections.remove(conn);
}
fn inject_dial_failure(
&mut self,
peer: Option<PeerId>,
handler: Self::ProtocolsHandler,
error: &DialError,
) {
self.inner.inject_dial_failure(peer, handler, error);
if let Some(event) = self.as_server().on_outbound_dial_error(peer, error) {
self.pending_out_events
.push_back(Event::InboundProbe(event));
}
}
fn inject_disconnected(&mut self, peer: &PeerId) {
self.inner.inject_disconnected(peer);
self.connected.remove(peer);
}
fn inject_address_change(
&mut self,
peer: &PeerId,
conn: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
self.inner.inject_address_change(peer, conn, old, new);
if old.is_relayed() && new.is_relayed() {
return;
}
let connections = self.connected.get_mut(peer).expect("Peer is connected.");
let addr = if new.is_relayed() {
None
} else {
Some(new.get_remote_address().clone())
};
connections.insert(*conn, addr);
}
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_new_listen_addr(id, addr);
self.as_client().on_new_address();
}
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_expired_listen_addr(id, addr);
self.as_client().on_expired_address(addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_new_external_addr(addr);
self.as_client().on_new_address();
}
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_expired_external_addr(addr);
self.as_client().on_expired_address(addr);
}
fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll<Action> {
loop {
if let Some(event) = self.pending_out_events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
let mut is_inner_pending = false;
match self.inner.poll(cx, params) {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
let (mut events, action) = match event {
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
}
| RequestResponseEvent::OutboundFailure { .. } => {
self.as_client().handle_event(params, event)
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Request { .. },
..
}
| RequestResponseEvent::InboundFailure { .. } => {
self.as_server().handle_event(params, event)
}
RequestResponseEvent::ResponseSent { .. } => (VecDeque::new(), None),
};
self.pending_out_events.append(&mut events);
if let Some(action) = action {
return Poll::Ready(action);
}
}
Poll::Ready(action) => return Poll::Ready(action.map_out(|_| unreachable!())),
Poll::Pending => is_inner_pending = true,
}
match self.as_client().poll_auto_probe(params, cx) {
Poll::Ready(event) => self
.pending_out_events
.push_back(Event::OutboundProbe(event)),
Poll::Pending if is_inner_pending => return Poll::Pending,
Poll::Pending => {}
}
}
}
fn new_handler(&mut self) -> Self::ProtocolsHandler {
self.inner.new_handler()
}
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.inner.addresses_of_peer(peer)
}
fn inject_connected(&mut self, peer: &PeerId) {
self.inner.inject_connected(peer)
}
fn inject_event(
&mut self,
peer_id: PeerId,
conn: ConnectionId,
event: RequestResponseHandlerEvent<AutoNatCodec>,
) {
self.inner.inject_event(peer_id, conn, event)
}
fn inject_listen_failure(
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ProtocolsHandler,
) {
self.inner
.inject_listen_failure(local_addr, send_back_addr, handler)
}
fn inject_new_listener(&mut self, id: ListenerId) {
self.inner.inject_new_listener(id)
}
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
self.inner.inject_listener_error(id, err)
}
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
self.inner.inject_listener_closed(id, reason)
}
}
type Action = NetworkBehaviourAction<
<Behaviour as NetworkBehaviour>::OutEvent,
<Behaviour as NetworkBehaviour>::ProtocolsHandler,
>;
// Trait implemented for `AsClient` as `AsServer` to handle events from the inner [`RequestResponse`] Protocol.
trait HandleInnerEvent {
fn handle_event(
&mut self,
params: &mut impl PollParameters,
event: RequestResponseEvent<DialRequest, DialResponse>,
) -> (VecDeque<Event>, Option<Action>);
}

View File

@ -0,0 +1,373 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::ResponseError;
use super::{
Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus,
ProbeId,
};
use futures::FutureExt;
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId};
use libp2p_request_response::{
OutboundFailure, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage,
};
use libp2p_swarm::{AddressScore, NetworkBehaviourAction, PollParameters};
use rand::{seq::SliceRandom, thread_rng};
use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
time::Duration,
};
/// Outbound probe failed or was aborted.
#[derive(Debug, Clone, PartialEq)]
pub enum OutboundProbeError {
/// Probe was aborted because no server is known, or all servers
/// are throttled through [`Config::throttle_server_period`].
NoServer,
/// Probe was aborted because the local peer has no listening or
/// external addresses.
NoAddresses,
/// Sending the dial-back request or receiving a response failed.
OutboundRequest(OutboundFailure),
/// The server refused or failed to dial us.
Response(ResponseError),
}
#[derive(Debug, Clone, PartialEq)]
pub enum OutboundProbeEvent {
/// A dial-back request was sent to a remote peer.
Request {
probe_id: ProbeId,
/// Peer to which the request is sent.
peer: PeerId,
},
/// The remote successfully dialed one of our addresses.
Response {
probe_id: ProbeId,
/// Id of the peer that sent the response.
peer: PeerId,
/// The address at which the remote succeeded to dial us.
address: Multiaddr,
},
/// The outbound request failed, was rejected, or the remote could dial
/// none of our addresses.
Error {
probe_id: ProbeId,
/// Id of the peer used for the probe.
/// `None` if the probe was aborted due to no addresses or no qualified server.
peer: Option<PeerId>,
error: OutboundProbeError,
},
}
/// View over [`super::Behaviour`] in a client role.
pub struct AsClient<'a> {
pub inner: &'a mut RequestResponse<AutoNatCodec>,
pub local_peer_id: PeerId,
pub config: &'a Config,
pub connected: &'a HashMap<PeerId, HashMap<ConnectionId, Option<Multiaddr>>>,
pub probe_id: &'a mut ProbeId,
pub servers: &'a Vec<PeerId>,
pub throttled_servers: &'a mut Vec<(PeerId, Instant)>,
pub nat_status: &'a mut NatStatus,
pub confidence: &'a mut usize,
pub ongoing_outbound: &'a mut HashMap<RequestId, ProbeId>,
pub last_probe: &'a mut Option<Instant>,
pub schedule_probe: &'a mut Delay,
}
impl<'a> HandleInnerEvent for AsClient<'a> {
fn handle_event(
&mut self,
params: &mut impl PollParameters,
event: RequestResponseEvent<DialRequest, DialResponse>,
) -> (VecDeque<Event>, Option<Action>) {
let mut events = VecDeque::new();
let mut action = None;
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Response {
request_id,
response,
},
} => {
log::debug!("Outbound dial-back request returned {:?}.", response);
let probe_id = self
.ongoing_outbound
.remove(&request_id)
.expect("RequestId exists.");
let event = match response.result.clone() {
Ok(address) => OutboundProbeEvent::Response {
probe_id,
peer,
address,
},
Err(e) => OutboundProbeEvent::Error {
probe_id,
peer: Some(peer),
error: OutboundProbeError::Response(e),
},
};
events.push_back(Event::OutboundProbe(event));
if let Some(old) = self.handle_reported_status(response.result.clone().into()) {
events.push_back(Event::StatusChanged {
old,
new: self.nat_status.clone(),
});
}
if let Ok(address) = response.result {
// Update observed address score if it is finite.
let score = params
.external_addresses()
.find_map(|r| (r.addr == address).then(|| r.score))
.unwrap_or(AddressScore::Finite(0));
if let AddressScore::Finite(finite_score) = score {
action = Some(NetworkBehaviourAction::ReportObservedAddr {
address,
score: AddressScore::Finite(finite_score + 1),
});
}
}
}
RequestResponseEvent::OutboundFailure {
peer,
error,
request_id,
} => {
log::debug!(
"Outbound Failure {} when on dial-back request to peer {}.",
error,
peer
);
let probe_id = self
.ongoing_outbound
.remove(&request_id)
.unwrap_or_else(|| self.probe_id.next());
events.push_back(Event::OutboundProbe(OutboundProbeEvent::Error {
probe_id,
peer: Some(peer),
error: OutboundProbeError::OutboundRequest(error),
}));
self.schedule_probe.reset(Duration::ZERO);
}
_ => {}
}
(events, action)
}
}
impl<'a> AsClient<'a> {
pub fn poll_auto_probe(
&mut self,
params: &mut impl PollParameters,
cx: &mut Context<'_>,
) -> Poll<OutboundProbeEvent> {
match self.schedule_probe.poll_unpin(cx) {
Poll::Ready(()) => {
self.schedule_probe.reset(self.config.retry_interval);
let mut addresses: Vec<_> = params.external_addresses().map(|r| r.addr).collect();
addresses.extend(params.listened_addresses());
let probe_id = self.probe_id.next();
let event = match self.do_probe(probe_id, addresses) {
Ok(peer) => OutboundProbeEvent::Request { probe_id, peer },
Err(error) => {
self.handle_reported_status(NatStatus::Unknown);
OutboundProbeEvent::Error {
probe_id,
peer: None,
error,
}
}
};
Poll::Ready(event)
}
Poll::Pending => Poll::Pending,
}
}
// An inbound connection can indicate that we are public; adjust the delay to the next probe.
pub fn on_inbound_connection(&mut self) {
if *self.confidence == self.config.confidence_max {
if self.nat_status.is_public() {
self.schedule_next_probe(self.config.refresh_interval * 2);
} else {
self.schedule_next_probe(self.config.refresh_interval / 5);
}
}
}
pub fn on_new_address(&mut self) {
if !self.nat_status.is_public() {
// New address could be publicly reachable, trigger retry.
if *self.confidence > 0 {
*self.confidence -= 1;
}
self.schedule_next_probe(self.config.retry_interval);
}
}
pub fn on_expired_address(&mut self, addr: &Multiaddr) {
if let NatStatus::Public(public_address) = self.nat_status {
if public_address == addr {
*self.confidence = 0;
*self.nat_status = NatStatus::Unknown;
self.schedule_next_probe(Duration::ZERO);
}
}
}
// Select a random server for the probe.
fn random_server(&mut self) -> Option<PeerId> {
// Update list of throttled servers.
let i = self.throttled_servers.partition_point(|(_, time)| {
*time + self.config.throttle_server_period < Instant::now()
});
self.throttled_servers.drain(..i);
let mut servers: Vec<&PeerId> = self.servers.iter().collect();
if self.config.use_connected {
servers.extend(self.connected.iter().map(|(id, _)| id));
}
servers.retain(|s| !self.throttled_servers.iter().any(|(id, _)| s == &id));
servers.choose(&mut thread_rng()).map(|&&p| p)
}
// Send a dial-request to a randomly selected server.
// Returns the server that is used in this probe.
// `Err` if there are no qualified servers or no addresses.
fn do_probe(
&mut self,
probe_id: ProbeId,
addresses: Vec<Multiaddr>,
) -> Result<PeerId, OutboundProbeError> {
let _ = self.last_probe.insert(Instant::now());
if addresses.is_empty() {
log::debug!("Outbound dial-back request aborted: No dial-back addresses.");
return Err(OutboundProbeError::NoAddresses);
}
let server = match self.random_server() {
Some(s) => s,
None => {
log::debug!("Outbound dial-back request aborted: No qualified server.");
return Err(OutboundProbeError::NoServer);
}
};
let request_id = self.inner.send_request(
&server,
DialRequest {
peer_id: self.local_peer_id,
addresses,
},
);
self.throttled_servers.push((server, Instant::now()));
log::debug!("Send dial-back request to peer {}.", server);
self.ongoing_outbound.insert(request_id, probe_id);
Ok(server)
}
// Set the delay to the next probe based on the time of our last probe
// and the specified delay.
fn schedule_next_probe(&mut self, delay: Duration) {
let last_probe_instant = match self.last_probe {
Some(instant) => instant,
None => {
return;
}
};
let schedule_next = *last_probe_instant + delay;
self.schedule_probe
.reset(schedule_next.saturating_duration_since(Instant::now()));
}
// Adapt current confidence and NAT status to the status reported by the latest probe.
// Return the old status if it flipped.
fn handle_reported_status(&mut self, reported_status: NatStatus) -> Option<NatStatus> {
self.schedule_next_probe(self.config.retry_interval);
if matches!(reported_status, NatStatus::Unknown) {
return None;
}
if reported_status == *self.nat_status {
if *self.confidence < self.config.confidence_max {
*self.confidence += 1;
}
// Delay with (usually longer) refresh-interval.
if *self.confidence >= self.config.confidence_max {
self.schedule_next_probe(self.config.refresh_interval);
}
return None;
}
if reported_status.is_public() && self.nat_status.is_public() {
// Different address than the currently assumed public address was reported.
// Switch address, but don't report as flipped.
*self.nat_status = reported_status;
return None;
}
if *self.confidence > 0 {
// Reduce confidence but keep old status.
*self.confidence -= 1;
return None;
}
log::debug!(
"Flipped assumed NAT status from {:?} to {:?}",
self.nat_status,
reported_status
);
let old_status = self.nat_status.clone();
*self.nat_status = reported_status;
Some(old_status)
}
}
impl From<Result<Multiaddr, ResponseError>> for NatStatus {
fn from(result: Result<Multiaddr, ResponseError>) -> Self {
match result {
Ok(addr) => NatStatus::Public(addr),
Err(ResponseError::DialError) => NatStatus::Private,
_ => NatStatus::Unknown,
}
}
}

View File

@ -0,0 +1,439 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::{
Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, ProbeId,
ResponseError,
};
use instant::Instant;
use libp2p_core::{connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p_request_response::{
InboundFailure, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage,
ResponseChannel,
};
use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition},
DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use std::{
collections::{HashMap, HashSet, VecDeque},
num::NonZeroU8,
};
/// Inbound probe failed.
#[derive(Debug, Clone, PartialEq)]
pub enum InboundProbeError {
/// Receiving the dial-back request or sending a response failed.
InboundRequest(InboundFailure),
/// We refused or failed to dial the client.
Response(ResponseError),
}
#[derive(Debug, Clone, PartialEq)]
pub enum InboundProbeEvent {
/// A dial-back request was received from a remote peer.
Request {
probe_id: ProbeId,
/// Peer that sent the request.
peer: PeerId,
/// The addresses that will be attempted to dial.
addresses: Vec<Multiaddr>,
},
/// A dial request to the remote was successful.
Response {
probe_id: ProbeId,
/// Peer to which the response is sent.
peer: PeerId,
address: Multiaddr,
},
/// The inbound request failed, was rejected, or none of the remote's
/// addresses could be dialed.
Error {
probe_id: ProbeId,
/// Peer that sent the dial-back request.
peer: PeerId,
error: InboundProbeError,
},
}
/// View over [`super::Behaviour`] in a server role.
pub struct AsServer<'a> {
pub inner: &'a mut RequestResponse<AutoNatCodec>,
pub config: &'a Config,
pub connected: &'a HashMap<PeerId, HashMap<ConnectionId, Option<Multiaddr>>>,
pub probe_id: &'a mut ProbeId,
pub throttled_clients: &'a mut Vec<(PeerId, Instant)>,
#[allow(clippy::type_complexity)]
pub ongoing_inbound: &'a mut HashMap<
PeerId,
(
ProbeId,
RequestId,
Vec<Multiaddr>,
ResponseChannel<DialResponse>,
),
>,
}
impl<'a> HandleInnerEvent for AsServer<'a> {
fn handle_event(
&mut self,
_params: &mut impl PollParameters,
event: RequestResponseEvent<DialRequest, DialResponse>,
) -> (VecDeque<Event>, Option<Action>) {
let mut events = VecDeque::new();
let mut action = None;
match event {
RequestResponseEvent::Message {
peer,
message:
RequestResponseMessage::Request {
request_id,
request,
channel,
},
} => {
let probe_id = self.probe_id.next();
match self.resolve_inbound_request(peer, request) {
Ok(addrs) => {
log::debug!(
"Inbound dial request from Peer {} with dial-back addresses {:?}.",
peer,
addrs
);
self.ongoing_inbound
.insert(peer, (probe_id, request_id, addrs.clone(), channel));
self.throttled_clients.push((peer, Instant::now()));
events.push_back(Event::InboundProbe(InboundProbeEvent::Request {
probe_id,
peer,
addresses: addrs.clone(),
}));
action = Some(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(peer)
.condition(PeerCondition::Always)
.override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0"))
.addresses(addrs)
.build(),
handler: self.inner.new_handler(),
});
}
Err((status_text, error)) => {
log::debug!(
"Reject inbound dial request from peer {}: {}.",
peer,
status_text
);
let response = DialResponse {
result: Err(error.clone()),
status_text: Some(status_text),
};
let _ = self.inner.send_response(channel, response);
events.push_back(Event::InboundProbe(InboundProbeEvent::Error {
probe_id,
peer,
error: InboundProbeError::Response(error),
}));
}
}
}
RequestResponseEvent::InboundFailure {
peer,
error,
request_id,
} => {
log::debug!(
"Inbound Failure {} when on dial-back request from peer {}.",
error,
peer
);
let probe_id = match self.ongoing_inbound.get(&peer) {
Some((_, rq_id, _, _)) if *rq_id == request_id => {
self.ongoing_inbound.remove(&peer).unwrap().0
}
_ => self.probe_id.next(),
};
events.push_back(Event::InboundProbe(InboundProbeEvent::Error {
probe_id,
peer,
error: InboundProbeError::InboundRequest(error),
}));
}
_ => {}
}
(events, action)
}
}
impl<'a> AsServer<'a> {
pub fn on_outbound_connection(
&mut self,
peer: &PeerId,
address: &Multiaddr,
) -> Option<InboundProbeEvent> {
let (_, _, addrs, _) = self.ongoing_inbound.get(peer)?;
// Check if the dialed address was among the requested addresses.
if !addrs.contains(address) {
return None;
}
log::debug!(
"Dial-back to peer {} succeeded at addr {:?}.",
peer,
address
);
let (probe_id, _, _, channel) = self.ongoing_inbound.remove(peer).unwrap();
let response = DialResponse {
result: Ok(address.clone()),
status_text: None,
};
let _ = self.inner.send_response(channel, response);
Some(InboundProbeEvent::Response {
probe_id,
peer: *peer,
address: address.clone(),
})
}
pub fn on_outbound_dial_error(
&mut self,
peer: Option<PeerId>,
error: &DialError,
) -> Option<InboundProbeEvent> {
let (probe_id, _, _, channel) = peer.and_then(|p| self.ongoing_inbound.remove(&p))?;
log::debug!(
"Dial-back to peer {} failed with error {:?}.",
peer.unwrap(),
error
);
let response_error = ResponseError::DialError;
let response = DialResponse {
result: Err(response_error.clone()),
status_text: Some("dial failed".to_string()),
};
let _ = self.inner.send_response(channel, response);
Some(InboundProbeEvent::Error {
probe_id,
peer: peer.expect("PeerId is present."),
error: InboundProbeError::Response(response_error),
})
}
// Validate the inbound request and collect the addresses to be dialed.
fn resolve_inbound_request(
&mut self,
sender: PeerId,
request: DialRequest,
) -> Result<Vec<Multiaddr>, (String, ResponseError)> {
// Update list of throttled clients.
let i = self.throttled_clients.partition_point(|(_, time)| {
*time + self.config.throttle_clients_period < Instant::now()
});
self.throttled_clients.drain(..i);
if request.peer_id != sender {
let status_text = "peer id mismatch".to_string();
return Err((status_text, ResponseError::BadRequest));
}
if self.ongoing_inbound.contains_key(&sender) {
let status_text = "dial-back already ongoing".to_string();
return Err((status_text, ResponseError::DialRefused));
}
if self.throttled_clients.len() >= self.config.throttle_clients_global_max {
let status_text = "too many total dials".to_string();
return Err((status_text, ResponseError::DialRefused));
}
let throttled_for_client = self
.throttled_clients
.iter()
.filter(|(p, _)| p == &sender)
.count();
if throttled_for_client >= self.config.throttle_clients_peer_max {
let status_text = "too many dials for peer".to_string();
return Err((status_text, ResponseError::DialRefused));
}
// Obtain an observed address from non-relayed connections.
let observed_addr = self
.connected
.get(&sender)
.expect("Peer is connected.")
.values()
.find_map(|a| a.as_ref())
.ok_or_else(|| {
let status_text = "no dial-request over relayed connections".to_string();
(status_text, ResponseError::DialError)
})?;
let mut addrs = Self::filter_valid_addrs(sender, request.addresses, observed_addr);
addrs.truncate(self.config.max_peer_addresses);
if addrs.is_empty() {
let status_text = "no dialable addresses".to_string();
return Err((status_text, ResponseError::DialError));
}
Ok(addrs)
}
// Filter dial addresses and replace demanded ip with the observed one.
fn filter_valid_addrs(
peer: PeerId,
demanded: Vec<Multiaddr>,
observed_remote_at: &Multiaddr,
) -> Vec<Multiaddr> {
// Skip if the observed address is a relay address.
if observed_remote_at.iter().any(|p| p == Protocol::P2pCircuit) {
return Vec::new();
}
let observed_ip = match observed_remote_at
.into_iter()
.find(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_)))
{
Some(ip) => ip,
None => return Vec::new(),
};
let mut distinct = HashSet::new();
demanded
.into_iter()
.filter_map(|addr| {
// Replace the demanded ip with the observed one.
let i = addr
.iter()
.position(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_)))?;
let mut addr = addr.replace(i, |_| Some(observed_ip.clone()))?;
let is_valid = addr.iter().all(|proto| match proto {
Protocol::P2pCircuit => false,
Protocol::P2p(hash) => hash == peer.into(),
_ => true,
});
if !is_valid {
return None;
}
if !addr.iter().any(|p| matches!(p, Protocol::P2p(_))) {
addr.push(Protocol::P2p(peer.into()))
}
// Only collect distinct addresses.
distinct.insert(addr.clone()).then(|| addr)
})
.collect()
}
}
#[cfg(test)]
mod test {
use super::*;
use std::net::Ipv4Addr;
fn random_ip<'a>() -> Protocol<'a> {
Protocol::Ip4(Ipv4Addr::new(
rand::random(),
rand::random(),
rand::random(),
rand::random(),
))
}
fn random_port<'a>() -> Protocol<'a> {
Protocol::Tcp(rand::random())
}
#[test]
fn filter_addresses() {
let peer_id = PeerId::random();
let observed_ip = random_ip();
let observed_addr = Multiaddr::empty()
.with(observed_ip.clone())
.with(random_port())
.with(Protocol::P2p(peer_id.into()));
// Valid address with matching peer-id
let demanded_1 = Multiaddr::empty()
.with(random_ip())
.with(random_port())
.with(Protocol::P2p(peer_id.into()));
// Invalid because peer_id does not match
let demanded_2 = Multiaddr::empty()
.with(random_ip())
.with(random_port())
.with(Protocol::P2p(PeerId::random().into()));
// Valid address without peer-id
let demanded_3 = Multiaddr::empty().with(random_ip()).with(random_port());
// Invalid because relayed
let demanded_4 = Multiaddr::empty()
.with(random_ip())
.with(random_port())
.with(Protocol::P2p(PeerId::random().into()))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(peer_id.into()));
let demanded = vec![
demanded_1.clone(),
demanded_2,
demanded_3.clone(),
demanded_4,
];
let filtered = AsServer::filter_valid_addrs(peer_id, demanded, &observed_addr);
let expected_1 = demanded_1
.replace(0, |_| Some(observed_ip.clone()))
.unwrap();
let expected_2 = demanded_3
.replace(0, |_| Some(observed_ip))
.unwrap()
.with(Protocol::P2p(peer_id.into()));
assert_eq!(filtered, vec![expected_1, expected_2]);
}
#[test]
fn skip_relayed_addr() {
let peer_id = PeerId::random();
let observed_ip = random_ip();
// Observed address is relayed.
let observed_addr = Multiaddr::empty()
.with(observed_ip.clone())
.with(random_port())
.with(Protocol::P2p(PeerId::random().into()))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(peer_id.into()));
let demanded = Multiaddr::empty()
.with(random_ip())
.with(random_port())
.with(Protocol::P2p(peer_id.into()));
let filtered = AsServer::filter_valid_addrs(peer_id, vec![demanded], &observed_addr);
assert!(filtered.is_empty());
}
}

View File

@ -0,0 +1,36 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the AutoNAT protocol.
mod behaviour;
mod protocol;
pub use self::{
behaviour::{
Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, NatStatus,
OutboundProbeError, OutboundProbeEvent, ProbeId,
},
protocol::ResponseError,
};
pub use libp2p_request_response::{InboundFailure, OutboundFailure};
mod structs_proto {
include!(concat!(env!("OUT_DIR"), "/structs.rs"));
}

View File

@ -0,0 +1,333 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::structs_proto;
use async_trait::async_trait;
use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use libp2p_core::{upgrade, Multiaddr, PeerId};
use libp2p_request_response::{ProtocolName, RequestResponseCodec};
use prost::Message;
use std::{convert::TryFrom, io};
#[derive(Clone, Debug)]
pub struct AutoNatProtocol;
impl ProtocolName for AutoNatProtocol {
fn protocol_name(&self) -> &[u8] {
b"/libp2p/autonat/1.0.0"
}
}
#[derive(Clone)]
pub struct AutoNatCodec;
#[async_trait]
impl RequestResponseCodec for AutoNatCodec {
type Protocol = AutoNatProtocol;
type Request = DialRequest;
type Response = DialResponse;
async fn read_request<T>(
&mut self,
_: &AutoNatProtocol,
io: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Send + Unpin,
{
let bytes = upgrade::read_length_prefixed(io, 1024).await?;
let request = DialRequest::from_bytes(&bytes)?;
Ok(request)
}
async fn read_response<T>(
&mut self,
_: &AutoNatProtocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Send + Unpin,
{
let bytes = upgrade::read_length_prefixed(io, 1024).await?;
let response = DialResponse::from_bytes(&bytes)?;
Ok(response)
}
async fn write_request<T>(
&mut self,
_: &AutoNatProtocol,
io: &mut T,
data: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Send + Unpin,
{
upgrade::write_length_prefixed(io, data.into_bytes()).await?;
io.close().await
}
async fn write_response<T>(
&mut self,
_: &AutoNatProtocol,
io: &mut T,
data: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Send + Unpin,
{
upgrade::write_length_prefixed(io, data.into_bytes()).await?;
io.close().await
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct DialRequest {
pub peer_id: PeerId,
pub addresses: Vec<Multiaddr>,
}
impl DialRequest {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, io::Error> {
let msg = structs_proto::Message::decode(bytes)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
if msg.r#type != Some(structs_proto::message::MessageType::Dial as _) {
return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type"));
}
let (peer_id, addrs) = if let Some(structs_proto::message::Dial {
peer:
Some(structs_proto::message::PeerInfo {
id: Some(peer_id),
addrs,
}),
}) = msg.dial
{
(peer_id, addrs)
} else {
log::debug!("Received malformed dial message.");
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid dial message",
));
};
let peer_id = {
PeerId::try_from(peer_id)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid peer id"))?
};
let addrs = {
let mut maddrs = vec![];
for addr in addrs.into_iter() {
let maddr = Multiaddr::try_from(addr)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
maddrs.push(maddr);
}
maddrs
};
Ok(Self {
peer_id,
addresses: addrs,
})
}
pub fn into_bytes(self) -> Vec<u8> {
let peer_id = self.peer_id.to_bytes();
let addrs = self
.addresses
.into_iter()
.map(|addr| addr.to_vec())
.collect();
let msg = structs_proto::Message {
r#type: Some(structs_proto::message::MessageType::Dial as _),
dial: Some(structs_proto::message::Dial {
peer: Some(structs_proto::message::PeerInfo {
id: Some(peer_id),
addrs,
}),
}),
dial_response: None,
};
let mut bytes = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut bytes)
.expect("Vec<u8> provides capacity as needed");
bytes
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ResponseError {
DialError,
DialRefused,
BadRequest,
InternalError,
}
impl From<ResponseError> for i32 {
fn from(t: ResponseError) -> Self {
match t {
ResponseError::DialError => 100,
ResponseError::DialRefused => 101,
ResponseError::BadRequest => 200,
ResponseError::InternalError => 300,
}
}
}
impl TryFrom<structs_proto::message::ResponseStatus> for ResponseError {
type Error = io::Error;
fn try_from(value: structs_proto::message::ResponseStatus) -> Result<Self, Self::Error> {
match value {
structs_proto::message::ResponseStatus::EDialError => Ok(ResponseError::DialError),
structs_proto::message::ResponseStatus::EDialRefused => Ok(ResponseError::DialRefused),
structs_proto::message::ResponseStatus::EBadRequest => Ok(ResponseError::BadRequest),
structs_proto::message::ResponseStatus::EInternalError => {
Ok(ResponseError::InternalError)
}
structs_proto::message::ResponseStatus::Ok => {
log::debug!("Received response with status code OK but expected error.");
Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid response error type",
))
}
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct DialResponse {
pub status_text: Option<String>,
pub result: Result<Multiaddr, ResponseError>,
}
impl DialResponse {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, io::Error> {
let msg = structs_proto::Message::decode(bytes)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
if msg.r#type != Some(structs_proto::message::MessageType::DialResponse as _) {
return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid type"));
}
Ok(match msg.dial_response {
Some(structs_proto::message::DialResponse {
status: Some(status),
status_text,
addr: Some(addr),
}) if structs_proto::message::ResponseStatus::from_i32(status)
== Some(structs_proto::message::ResponseStatus::Ok) =>
{
let addr = Multiaddr::try_from(addr)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
Self {
status_text,
result: Ok(addr),
}
}
Some(structs_proto::message::DialResponse {
status: Some(status),
status_text,
addr: None,
}) => Self {
status_text,
result: Err(ResponseError::try_from(
structs_proto::message::ResponseStatus::from_i32(status).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "invalid response status code")
})?,
)?),
},
_ => {
log::debug!("Received malformed response message.");
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid dial response message",
));
}
})
}
pub fn into_bytes(self) -> Vec<u8> {
let dial_response = match self.result {
Ok(addr) => structs_proto::message::DialResponse {
status: Some(0),
status_text: self.status_text,
addr: Some(addr.to_vec()),
},
Err(error) => structs_proto::message::DialResponse {
status: Some(error.into()),
status_text: self.status_text,
addr: None,
},
};
let msg = structs_proto::Message {
r#type: Some(structs_proto::message::MessageType::DialResponse as _),
dial: None,
dial_response: Some(dial_response),
};
let mut bytes = Vec::with_capacity(msg.encoded_len());
msg.encode(&mut bytes)
.expect("Vec<u8> provides capacity as needed");
bytes
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_request_encode_decode() {
let request = DialRequest {
peer_id: PeerId::random(),
addresses: vec![
"/ip4/8.8.8.8/tcp/30333".parse().unwrap(),
"/ip4/192.168.1.42/tcp/30333".parse().unwrap(),
],
};
let bytes = request.clone().into_bytes();
let request2 = DialRequest::from_bytes(&bytes).unwrap();
assert_eq!(request, request2);
}
#[test]
fn test_response_ok_encode_decode() {
let response = DialResponse {
result: Ok("/ip4/8.8.8.8/tcp/30333".parse().unwrap()),
status_text: None,
};
let bytes = response.clone().into_bytes();
let response2 = DialResponse::from_bytes(&bytes).unwrap();
assert_eq!(response, response2);
}
#[test]
fn test_response_err_encode_decode() {
let response = DialResponse {
result: Err(ResponseError::DialError),
status_text: Some("dial failed".to_string()),
};
let bytes = response.clone().into_bytes();
let response2 = DialResponse::from_bytes(&bytes).unwrap();
assert_eq!(response, response2);
}
}

View File

@ -0,0 +1,37 @@
syntax = "proto2";
package structs;
message Message {
enum MessageType {
DIAL = 0;
DIAL_RESPONSE = 1;
}
enum ResponseStatus {
OK = 0;
E_DIAL_ERROR = 100;
E_DIAL_REFUSED = 101;
E_BAD_REQUEST = 200;
E_INTERNAL_ERROR = 300;
}
message PeerInfo {
optional bytes id = 1;
repeated bytes addrs = 2;
}
message Dial {
optional PeerInfo peer = 1;
}
message DialResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional bytes addr = 3;
}
optional MessageType type = 1;
optional Dial dial = 2;
optional DialResponse dialResponse = 3;
}

View File

@ -0,0 +1,513 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{channel::oneshot, Future, FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p::{
development_transport,
identity::Keypair,
swarm::{AddressScore, Swarm, SwarmEvent},
Multiaddr, PeerId,
};
use libp2p_autonat::{
Behaviour, Config, Event, NatStatus, OutboundProbeError, OutboundProbeEvent, ResponseError,
};
use std::time::Duration;
const MAX_CONFIDENCE: usize = 3;
const TEST_RETRY_INTERVAL: Duration = Duration::from_secs(1);
const TEST_REFRESH_INTERVAL: Duration = Duration::from_secs(2);
async fn init_swarm(config: Config) -> Swarm<Behaviour> {
let keypair = Keypair::generate_ed25519();
let local_id = PeerId::from_public_key(&keypair.public());
let transport = development_transport(keypair).await.unwrap();
let behaviour = Behaviour::new(local_id, config);
Swarm::new(transport, behaviour, local_id)
}
async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) {
let (tx, rx) = oneshot::channel();
async_std::task::spawn(async move {
let mut server = init_swarm(Config {
boot_delay: Duration::from_secs(60),
throttle_clients_peer_max: usize::MAX,
..Default::default()
})
.await;
let peer_id = *server.local_peer_id();
server
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
let addr = loop {
match server.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => break address,
_ => {}
};
};
tx.send((peer_id, addr)).unwrap();
let mut kill = kill.fuse();
loop {
futures::select! {
_ = server.select_next_some() => {},
_ = kill => return,
}
}
});
rx.await.unwrap()
}
async fn next_event(swarm: &mut Swarm<Behaviour>) -> Event {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(event) => {
break event;
}
_ => {}
}
}
}
async fn run_test_with_timeout(test: impl Future) {
futures::select! {
_ = test.fuse() => {},
_ = Delay::new(Duration::from_secs(60)).fuse() => panic!("test timed out")
}
}
#[async_std::test]
async fn test_auto_probe() {
let test = async {
let mut client = init_swarm(Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
throttle_server_period: Duration::ZERO,
boot_delay: Duration::ZERO,
..Default::default()
})
.await;
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
client.behaviour_mut().add_server(server_id, Some(addr));
// Initial status should be unknown.
assert_eq!(client.behaviour().nat_status(), NatStatus::Unknown);
assert!(client.behaviour().public_address().is_none());
assert_eq!(client.behaviour().confidence(), 0);
// Test no listening addresses
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoAddresses);
}
other => panic!("Unexpected Event: {:?}", other),
}
assert_eq!(client.behaviour().nat_status(), NatStatus::Unknown);
assert!(client.behaviour().public_address().is_none());
assert_eq!(client.behaviour().confidence(), 0);
// Test Private NAT Status
// Artificially add a faulty address.
let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite);
let id = match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
}
other => panic!("Unexpected Event: {:?}", other),
};
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Error {
probe_id,
peer,
error,
}) => {
assert_eq!(peer.unwrap(), server_id);
assert_eq!(probe_id, id);
assert_eq!(
error,
OutboundProbeError::Response(ResponseError::DialError)
);
}
other => panic!("Unexpected Event: {:?}", other),
}
match next_event(&mut client).await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert_eq!(new, NatStatus::Private);
}
other => panic!("Unexpected Event: {:?}", other),
}
assert_eq!(client.behaviour().confidence(), 0);
assert_eq!(client.behaviour().nat_status(), NatStatus::Private);
assert!(client.behaviour().public_address().is_none());
// Test new public listening address
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
match client.select_next_some().await {
SwarmEvent::NewListenAddr { .. } => break,
_ => {}
}
}
let id = match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
}
other => panic!("Unexpected Event: {:?}", other),
};
// Expect inbound dial from server.
loop {
match client.select_next_some().await {
SwarmEvent::ConnectionEstablished {
endpoint, peer_id, ..
} if endpoint.is_listener() => {
assert_eq!(peer_id, server_id);
break;
}
SwarmEvent::IncomingConnection { .. } | SwarmEvent::NewListenAddr { .. } => {}
_ => panic!("Unexpected Swarm Event"),
}
}
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Response { probe_id, peer, .. }) => {
assert_eq!(peer, server_id);
assert_eq!(probe_id, id);
}
other => panic!("Unexpected Event: {:?}", other),
}
// Expect to flip status to public
match next_event(&mut client).await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Private);
assert!(matches!(new, NatStatus::Public(_)));
assert!(new.is_public());
}
other => panic!("Unexpected Event: {:?}", other),
}
assert_eq!(client.behaviour().confidence(), 0);
assert!(client.behaviour().nat_status().is_public());
assert!(client.behaviour().public_address().is_some());
drop(_handle);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_confidence() {
let test = async {
let mut client = init_swarm(Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
throttle_server_period: Duration::ZERO,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
client.behaviour_mut().add_server(server_id, Some(addr));
// Randomly test either for public or for private status the confidence.
let test_public = rand::random::<bool>();
if test_public {
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
match client.select_next_some().await {
SwarmEvent::NewListenAddr { .. } => break,
_ => {}
}
}
} else {
let unreachable_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(unreachable_addr.clone(), AddressScore::Infinite);
}
for i in 0..MAX_CONFIDENCE + 1 {
let id = match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Request { probe_id, peer }) => {
assert_eq!(peer, server_id);
probe_id
}
other => panic!("Unexpected Event: {:?}", other),
};
match next_event(&mut client).await {
Event::OutboundProbe(event) => {
let (peer, probe_id) = match event {
OutboundProbeEvent::Response { probe_id, peer, .. } if test_public => {
(peer, probe_id)
}
OutboundProbeEvent::Error {
probe_id,
peer,
error,
} if !test_public => {
assert_eq!(
error,
OutboundProbeError::Response(ResponseError::DialError)
);
(peer.unwrap(), probe_id)
}
other => panic!("Unexpected Outbound Event: {:?}", other),
};
assert_eq!(peer, server_id);
assert_eq!(probe_id, id);
}
other => panic!("Unexpected Event: {:?}", other),
}
// Confidence should increase each iteration up to MAX_CONFIDENCE
let expect_confidence = if i <= MAX_CONFIDENCE {
i
} else {
MAX_CONFIDENCE
};
assert_eq!(client.behaviour().confidence(), expect_confidence);
assert_eq!(client.behaviour().nat_status().is_public(), test_public);
// Expect status to flip after first probe
if i == 0 {
match next_event(&mut client).await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert_eq!(new.is_public(), test_public);
}
other => panic!("Unexpected Event: {:?}", other),
}
}
}
drop(_handle);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_throttle_server_period() {
let test = async {
let mut client = init_swarm(Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
// Throttle servers so they can not be re-used for dial request.
throttle_server_period: Duration::from_secs(1000),
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
match client.select_next_some().await {
SwarmEvent::NewListenAddr { .. } => break,
_ => {}
}
}
let (_handle, rx) = oneshot::channel();
let (id, addr) = spawn_server(rx).await;
client.behaviour_mut().add_server(id, Some(addr));
// First probe should be successful and flip status to public.
loop {
match next_event(&mut client).await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert!(new.is_public());
break;
}
Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {}
Event::OutboundProbe(OutboundProbeEvent::Response { .. }) => {}
other => panic!("Unexpected Event: {:?}", other),
}
}
assert_eq!(client.behaviour().confidence(), 0);
// Expect following probe to fail because server is throttled
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Error { peer, error, .. }) => {
assert!(peer.is_none());
assert_eq!(error, OutboundProbeError::NoServer);
}
other => panic!("Unexpected Event: {:?}", other),
}
assert_eq!(client.behaviour().confidence(), 0);
drop(_handle)
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_use_connected_as_server() {
let test = async {
let mut client = init_swarm(Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
throttle_server_period: Duration::ZERO,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
let (_handle, rx) = oneshot::channel();
let (server_id, addr) = spawn_server(rx).await;
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
client.dial(addr).unwrap();
// await connection
loop {
if let SwarmEvent::ConnectionEstablished { peer_id, .. } =
client.select_next_some().await
{
assert_eq!(peer_id, server_id);
break;
}
}
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Request { peer, .. }) => {
assert_eq!(peer, server_id);
}
other => panic!("Unexpected Event: {:?}", other),
}
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Response { peer, .. }) => {
assert_eq!(peer, server_id);
}
other => panic!("Unexpected Event: {:?}", other),
}
drop(_handle);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_outbound_failure() {
let test = async {
let mut servers = Vec::new();
let mut client = init_swarm(Config {
retry_interval: TEST_RETRY_INTERVAL,
refresh_interval: TEST_REFRESH_INTERVAL,
confidence_max: MAX_CONFIDENCE,
throttle_server_period: Duration::ZERO,
boot_delay: Duration::from_millis(100),
..Default::default()
})
.await;
for _ in 0..5 {
let (tx, rx) = oneshot::channel();
let (id, addr) = spawn_server(rx).await;
client.behaviour_mut().add_server(id, Some(addr));
servers.push((id, tx));
}
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
match client.select_next_some().await {
SwarmEvent::NewListenAddr { .. } => break,
_ => {}
}
}
// First probe should be successful and flip status to public.
loop {
match next_event(&mut client).await {
Event::StatusChanged { old, new } => {
assert_eq!(old, NatStatus::Unknown);
assert!(new.is_public());
break;
}
Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {}
Event::OutboundProbe(OutboundProbeEvent::Response { .. }) => {}
other => panic!("Unexpected Event: {:?}", other),
}
}
let inactive = servers.split_off(1);
// Drop the handles of the inactive servers to kill them.
let inactive_ids: Vec<_> = inactive.into_iter().map(|(id, _handle)| id).collect();
// Expect to retry on outbound failure
loop {
match next_event(&mut client).await {
Event::OutboundProbe(OutboundProbeEvent::Request { .. }) => {}
Event::OutboundProbe(OutboundProbeEvent::Response { peer, .. }) => {
assert_eq!(peer, servers[0].0);
break;
}
Event::OutboundProbe(OutboundProbeEvent::Error {
peer: Some(peer),
error: OutboundProbeError::OutboundRequest(_),
..
}) => {
assert!(inactive_ids.contains(&peer));
}
other => panic!("Unexpected Event: {:?}", other),
}
}
};
run_test_with_timeout(test).await;
}

View File

@ -0,0 +1,421 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{channel::oneshot, Future, FutureExt, StreamExt};
use futures_timer::Delay;
use libp2p::{
development_transport,
identity::Keypair,
multiaddr::Protocol,
swarm::{AddressScore, Swarm, SwarmEvent},
Multiaddr, PeerId,
};
use libp2p_autonat::{
Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError,
};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::DialError;
use std::{num::NonZeroU32, time::Duration};
async fn init_swarm(config: Config) -> Swarm<Behaviour> {
let keypair = Keypair::generate_ed25519();
let local_id = PeerId::from_public_key(&keypair.public());
let transport = development_transport(keypair).await.unwrap();
let behaviour = Behaviour::new(local_id, config);
Swarm::new(transport, behaviour, local_id)
}
async fn init_server(config: Option<Config>) -> (Swarm<Behaviour>, PeerId, Multiaddr) {
let mut config = config.unwrap_or_default();
// Don't do any outbound probes.
config.boot_delay = Duration::from_secs(60);
let mut server = init_swarm(config).await;
let peer_id = *server.local_peer_id();
server
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
let addr = loop {
match server.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => break address,
_ => {}
};
};
(server, peer_id, addr)
}
async fn spawn_client(
listen: bool,
add_dummy_external_addr: bool,
server_id: PeerId,
server_addr: Multiaddr,
kill: oneshot::Receiver<()>,
) -> (PeerId, Option<Multiaddr>) {
let (tx, rx) = oneshot::channel();
async_std::task::spawn(async move {
let mut client = init_swarm(Config {
boot_delay: Duration::from_millis(100),
refresh_interval: Duration::from_millis(100),
retry_interval: Duration::from_millis(200),
throttle_server_period: Duration::ZERO,
..Default::default()
})
.await;
client
.behaviour_mut()
.add_server(server_id, Some(server_addr));
let peer_id = *client.local_peer_id();
let mut addr = None;
if listen {
client
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
loop {
match client.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => {
addr = Some(address);
break;
}
_ => {}
};
}
}
if add_dummy_external_addr {
let dummy_addr: Multiaddr = "/ip4/127.0.0.1/tcp/42".parse().unwrap();
client.add_external_address(dummy_addr, AddressScore::Infinite);
}
tx.send((peer_id, addr)).unwrap();
let mut kill = kill.fuse();
loop {
futures::select! {
_ = client.select_next_some() => {},
_ = kill => return,
}
}
});
rx.await.unwrap()
}
async fn next_event(swarm: &mut Swarm<Behaviour>) -> Event {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(event) => {
break event;
}
_ => {}
}
}
}
async fn run_test_with_timeout(test: impl Future) {
futures::select! {
_ = test.fuse() => {},
_ = Delay::new(Duration::from_secs(60)).fuse() => panic!("test timed out")
}
}
#[async_std::test]
async fn test_dial_back() {
let test = async {
let (mut server, server_id, server_addr) = init_server(None).await;
let (_handle, rx) = oneshot::channel();
let (client_id, client_addr) = spawn_client(true, false, server_id, server_addr, rx).await;
let client_port = client_addr
.unwrap()
.into_iter()
.find_map(|p| match p {
Protocol::Tcp(port) => Some(port),
_ => None,
})
.unwrap();
let observed_client_ip = loop {
match server.select_next_some().await {
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint:
ConnectedPoint::Listener {
mut send_back_addr, ..
},
..
} => {
assert_eq!(peer_id, client_id);
let observed_client_ip = loop {
match send_back_addr.pop().unwrap() {
Protocol::Ip4(ip4_addr) => break ip4_addr,
_ => {}
}
};
break observed_client_ip;
}
SwarmEvent::IncomingConnection { .. } | SwarmEvent::NewListenAddr { .. } => {}
_ => panic!("Unexpected Swarm Event"),
}
};
let expect_addr = Multiaddr::empty()
.with(Protocol::Ip4(observed_client_ip))
.with(Protocol::Tcp(client_port))
.with(Protocol::P2p(client_id.into()));
let request_probe_id = match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Request {
peer,
addresses,
probe_id,
}) => {
assert_eq!(peer, client_id);
assert_eq!(addresses.len(), 1);
assert_eq!(addresses[0], expect_addr);
probe_id
}
other => panic!("Unexpected Event: {:?}", other),
};
loop {
match server.select_next_some().await {
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint: ConnectedPoint::Dialer { address },
num_established,
concurrent_dial_errors,
} => {
assert_eq!(peer_id, client_id);
assert_eq!(num_established, NonZeroU32::new(2).unwrap());
assert!(concurrent_dial_errors.unwrap().is_empty());
assert_eq!(address, expect_addr);
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } => {}
_ => panic!("Unexpected Swarm Event"),
}
}
match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Response {
probe_id,
peer,
address,
}) => {
assert_eq!(probe_id, request_probe_id);
assert_eq!(peer, client_id);
assert_eq!(address, expect_addr);
}
other => panic!("Unexpected Event: {:?}", other),
}
drop(_handle);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_dial_error() {
let test = async {
let (mut server, server_id, server_addr) = init_server(None).await;
let (_handle, rx) = oneshot::channel();
let (client_id, _) = spawn_client(false, true, server_id, server_addr, rx).await;
let request_probe_id = match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => {
assert_eq!(peer, client_id);
probe_id
}
other => panic!("Unexpected Event: {:?}", other),
};
loop {
match server.select_next_some().await {
SwarmEvent::OutgoingConnectionError { peer_id, error } => {
assert_eq!(peer_id.unwrap(), client_id);
assert!(matches!(error, DialError::Transport(_)));
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } => {}
_ => panic!("Unexpected Swarm Event"),
}
}
match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Error {
probe_id,
peer,
error,
}) => {
assert_eq!(probe_id, request_probe_id);
assert_eq!(peer, client_id);
assert_eq!(error, InboundProbeError::Response(ResponseError::DialError));
}
other => panic!("Unexpected Event: {:?}", other),
}
drop(_handle);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_throttle_global_max() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
throttle_clients_global_max: 1,
throttle_clients_period: Duration::from_secs(60),
..Default::default()
}))
.await;
let mut _handles = Vec::new();
for _ in 0..2 {
let (_handle, rx) = oneshot::channel();
spawn_client(true, false, server_id, server_addr.clone(), rx).await;
_handles.push(_handle);
}
let (first_probe_id, first_peer_id) = match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => {
(probe_id, peer)
}
other => panic!("Unexpected Event: {:?}", other),
};
loop {
match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Error {
peer,
probe_id,
error: InboundProbeError::Response(ResponseError::DialRefused),
}) => {
assert_ne!(first_peer_id, peer);
assert_ne!(first_probe_id, probe_id);
break;
}
Event::InboundProbe(InboundProbeEvent::Response { peer, probe_id, .. }) => {
assert_eq!(first_peer_id, peer);
assert_eq!(first_probe_id, probe_id);
}
other => panic!("Unexpected Event: {:?}", other),
};
}
drop(_handles);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_throttle_peer_max() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
throttle_clients_peer_max: 1,
throttle_clients_period: Duration::from_secs(60),
..Default::default()
}))
.await;
let (_handle, rx) = oneshot::channel();
let (client_id, _) = spawn_client(true, false, server_id, server_addr.clone(), rx).await;
let first_probe_id = match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Request { peer, probe_id, .. }) => {
assert_eq!(client_id, peer);
probe_id
}
other => panic!("Unexpected Event: {:?}", other),
};
match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Response { peer, probe_id, .. }) => {
assert_eq!(peer, client_id);
assert_eq!(probe_id, first_probe_id);
}
other => panic!("Unexpected Event: {:?}", other),
}
match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Error {
peer,
probe_id,
error,
}) => {
assert_eq!(client_id, peer);
assert_ne!(first_probe_id, probe_id);
assert_eq!(
error,
InboundProbeError::Response(ResponseError::DialRefused)
)
}
other => panic!("Unexpected Event: {:?}", other),
};
drop(_handle);
};
run_test_with_timeout(test).await;
}
#[async_std::test]
async fn test_dial_multiple_addr() {
let test = async {
let (mut server, server_id, server_addr) = init_server(Some(Config {
throttle_clients_peer_max: 1,
throttle_clients_period: Duration::from_secs(60),
..Default::default()
}))
.await;
let (_handle, rx) = oneshot::channel();
let (client_id, _) = spawn_client(true, true, server_id, server_addr.clone(), rx).await;
let dial_addresses = match next_event(&mut server).await {
Event::InboundProbe(InboundProbeEvent::Request {
peer, addresses, ..
}) => {
assert_eq!(addresses.len(), 2);
assert_eq!(client_id, peer);
addresses
}
other => panic!("Unexpected Event: {:?}", other),
};
loop {
match server.select_next_some().await {
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint: ConnectedPoint::Dialer { address },
concurrent_dial_errors,
..
} => {
assert_eq!(peer_id, client_id);
let dial_errors = concurrent_dial_errors.unwrap();
assert_eq!(dial_errors.len(), 1);
assert_eq!(dial_errors[0].0, dial_addresses[0]);
assert_eq!(address, dial_addresses[1]);
break;
}
SwarmEvent::Dialing(peer) => assert_eq!(peer, client_id),
SwarmEvent::NewListenAddr { .. } => {}
_ => panic!("Unexpected Swarm Event"),
}
}
};
run_test_with_timeout(test).await;
}

View File

@ -40,6 +40,10 @@ pub use libp2p_core::multihash;
#[doc(inline)]
pub use multiaddr;
#[cfg(feature = "autonat")]
#[cfg_attr(docsrs, doc(cfg(feature = "autonat")))]
#[doc(inline)]
pub use libp2p_autonat as autonat;
#[doc(inline)]
pub use libp2p_core as core;
#[cfg(feature = "deflate")]