mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-07 16:42:16 +00:00
Allows `NetworkBehaviour` implementations to dial a peer, but instruct the dialed connection to be upgraded as if it were the listening endpoint. This is needed when establishing direct connections through NATs and/or Firewalls (hole punching). When hole punching via TCP (QUIC is different but similar) both ends dial the other at the same time resulting in a simultaneously opened TCP connection. To disambiguate who is the dialer and who the listener there are two options: 1. Use the Simultaneous Open Extension of Multistream Select. See [sim-open] specification and [sim-open-rust] Rust implementation. 2. Disambiguate the role (dialer or listener) based on the role within the DCUtR [dcutr] protocol. More specifically the node initiating the DCUtR process will act as a listener and the other as a dialer. This commit enables (2), i.e. enables the DCUtR protocol to specify the role used once the connection is established. While on the positive side (2) requires one round trip less than (1), on the negative side (2) only works for coordinated simultaneous dials. I.e. when a simultaneous dial happens by chance, and not coordinated via DCUtR, the connection attempt fails when only (2) is in place. [sim-open]: https://github.com/libp2p/specs/blob/master/connections/simopen.md [sim-open-rust]: https://github.com/libp2p/rust-libp2p/pull/2066 [dcutr]: https://github.com/libp2p/specs/blob/master/relay/DCUtR.md
1385 lines
51 KiB
Rust
1385 lines
51 KiB
Rust
// Copyright 2021 Parity Technologies (UK) Ltd.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
// to deal in the Software without restriction, including without limitation
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
use futures::executor::LocalPool;
|
|
use futures::future::FutureExt;
|
|
use futures::stream::{Stream, StreamExt};
|
|
use futures::task::Spawn;
|
|
use libp2p::kad::record::store::MemoryStore;
|
|
use libp2p::NetworkBehaviour;
|
|
use libp2p_core::connection::ConnectedPoint;
|
|
use libp2p_core::either::EitherTransport;
|
|
use libp2p_core::multiaddr::{Multiaddr, Protocol};
|
|
use libp2p_core::transport::{MemoryTransport, Transport, TransportError};
|
|
use libp2p_core::{identity, upgrade, PeerId};
|
|
use libp2p_identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo};
|
|
use libp2p_kad::{GetClosestPeersOk, Kademlia, KademliaEvent, QueryResult};
|
|
use libp2p_ping as ping;
|
|
use libp2p_plaintext::PlainText2Config;
|
|
use libp2p_relay::v1::{new_transport_and_behaviour, Relay, RelayConfig};
|
|
use libp2p_swarm::protocols_handler::KeepAlive;
|
|
use libp2p_swarm::{
|
|
DialError, DummyBehaviour, NetworkBehaviour, NetworkBehaviourAction,
|
|
NetworkBehaviourEventProcess, PollParameters, Swarm, SwarmEvent,
|
|
};
|
|
use std::task::{Context, Poll};
|
|
use std::time::Duration;
|
|
use void::Void;
|
|
|
|
#[test]
|
|
fn src_connect_to_dst_listening_via_relay() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut dst_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
|
|
let src_peer_id = *src_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
|
|
let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
|
|
let dst_listen_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.into()))
|
|
.with(Protocol::P2pCircuit);
|
|
let dst_addr_via_relay = dst_listen_addr_via_relay
|
|
.clone()
|
|
.with(Protocol::P2p(dst_peer_id.into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
let dst_listener = dst_swarm
|
|
.listen_on(dst_listen_addr_via_relay.clone())
|
|
.unwrap();
|
|
|
|
pool.run_until(async {
|
|
// Destination Node dialing Relay.
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, relay_peer_id),
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Destination Node establishing connection to Relay.
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
|
assert_eq!(peer_id, relay_peer_id);
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Destination Node reporting listen address via relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener => {
|
|
assert_eq!(address, dst_listen_addr_via_relay);
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
let dst = async move {
|
|
// Destination Node receiving connection from Source Node via Relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::IncomingConnection { send_back_addr, .. } => {
|
|
assert_eq!(
|
|
send_back_addr,
|
|
Protocol::P2p(src_peer_id.clone().into()).into()
|
|
);
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Destination Node establishing connection from Source Node via Relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == src_peer_id => {
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Destination Node waiting for Ping from Source Node via Relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::ConnectionClosed { peer_id, .. } => {
|
|
assert_eq!(peer_id, src_peer_id);
|
|
break;
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
};
|
|
|
|
src_swarm.dial(dst_addr_via_relay).unwrap();
|
|
let src = async move {
|
|
// Source Node dialing Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node establishing connection to Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node establishing connection to destination node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
|
|
break
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Source Node waiting for Ping from Destination Node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(ping::Event {
|
|
peer,
|
|
result: Ok(_),
|
|
})) => {
|
|
if peer == dst_peer_id {
|
|
break;
|
|
}
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
};
|
|
|
|
futures::future::join(dst, src).await
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn src_connect_to_dst_not_listening_via_active_relay() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Active);
|
|
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(dst_addr.into_iter().next().unwrap())
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
dst_swarm.listen_on(dst_addr.clone()).unwrap();
|
|
// Instruct destination node to listen for incoming relayed connections from unknown relay nodes.
|
|
dst_swarm.listen_on(Protocol::P2pCircuit.into()).unwrap();
|
|
spawn_swarm_on_pool(&pool, dst_swarm);
|
|
|
|
src_swarm.dial(dst_addr_via_relay).unwrap();
|
|
pool.run_until(async move {
|
|
// Source Node dialing Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node establishing connection to Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node establishing connection to destination node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
|
|
break
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Source Node waiting for Ping from Destination Node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(ping::Event {
|
|
peer,
|
|
result: Ok(_),
|
|
})) => {
|
|
if peer == dst_peer_id {
|
|
break;
|
|
}
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn src_connect_to_dst_via_established_connection_to_relay() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
let dst_listener = dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap();
|
|
// Wait for destination to listen via relay.
|
|
pool.run_until(async {
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(_) => {}
|
|
SwarmEvent::ConnectionEstablished { .. } => {}
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener => {
|
|
assert_eq!(address, dst_addr_via_relay);
|
|
break;
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
spawn_swarm_on_pool(&pool, dst_swarm);
|
|
|
|
pool.run_until(async move {
|
|
src_swarm.dial(relay_addr).unwrap();
|
|
|
|
// Source Node establishing connection to Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {
|
|
break
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
src_swarm.dial(dst_addr_via_relay).unwrap();
|
|
|
|
// Source Node establishing connection to destination node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
|
|
break
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Source Node waiting for Ping from Destination Node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(ping::Event {
|
|
peer,
|
|
result: Ok(_),
|
|
})) => {
|
|
if peer == dst_peer_id {
|
|
break;
|
|
}
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn src_try_connect_to_offline_dst() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
let dst_peer_id = PeerId::random();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(dst_addr.into_iter().next().unwrap())
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
src_swarm.dial(dst_addr_via_relay.clone()).unwrap();
|
|
pool.run_until(async move {
|
|
// Source Node dialing Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node establishing connection to Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::OutgoingConnectionError {
|
|
error: DialError::Transport(addresses),
|
|
peer_id,
|
|
} if *addresses.iter().map(|(a, _)| a).next().unwrap() == dst_addr_via_relay => {
|
|
assert_eq!(peer_id, Some(dst_peer_id));
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn src_try_connect_to_unsupported_dst() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut dst_swarm = build_keep_alive_only_swarm();
|
|
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(dst_addr.into_iter().next().unwrap())
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
dst_swarm.listen_on(dst_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, dst_swarm);
|
|
|
|
src_swarm.dial(dst_addr_via_relay.clone()).unwrap();
|
|
pool.run_until(async move {
|
|
// Source Node dialing Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node establishing connection to Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::OutgoingConnectionError {
|
|
error: DialError::Transport(addresses),
|
|
peer_id,
|
|
} if *addresses.iter().map(|(a, _)| a).next().unwrap() == dst_addr_via_relay => {
|
|
assert_eq!(peer_id, Some(dst_peer_id));
|
|
break;
|
|
}
|
|
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn src_try_connect_to_offline_dst_via_offline_relay() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
|
|
let relay_peer_id = PeerId::random();
|
|
let dst_peer_id = PeerId::random();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(dst_addr.into_iter().next().unwrap())
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
src_swarm.dial(dst_addr_via_relay.clone()).unwrap();
|
|
pool.run_until(async move {
|
|
// Source Node dialing Relay to connect to Destination Node.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node fail to reach Relay.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::OutgoingConnectionError { peer_id, .. }
|
|
if peer_id == Some(relay_peer_id) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Source Node fail to reach Destination Node due to failure reaching Relay.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::OutgoingConnectionError {
|
|
error: DialError::Transport(addresses),
|
|
peer_id,
|
|
} if *addresses.iter().map(|(a, _)| a).next().unwrap() == dst_addr_via_relay => {
|
|
assert_eq!(peer_id, Some(dst_peer_id));
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routable_relay() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut dst_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
|
|
let src_peer_id = *src_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(Protocol::P2p(dst_peer_id.into()));
|
|
|
|
src_swarm
|
|
.behaviour_mut()
|
|
.kad
|
|
.add_address(&relay_peer_id, relay_addr.clone());
|
|
dst_swarm
|
|
.behaviour_mut()
|
|
.kad
|
|
.add_address(&relay_peer_id, relay_addr.clone());
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
// Destination Node listen via Relay.
|
|
let dst_listener = dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap();
|
|
|
|
pool.run_until(async {
|
|
// Destination Node dialing Relay.
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, relay_peer_id),
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Destination Node establishing connection to Relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
|
assert_eq!(peer_id, relay_peer_id);
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Destination Node reporting listen address via relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener => {
|
|
assert_eq!(address, dst_addr_via_relay);
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Destination Node bootstrapping.
|
|
let query_id = dst_swarm.behaviour_mut().kad.bootstrap().unwrap();
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(
|
|
KademliaEvent::OutboundQueryCompleted {
|
|
id,
|
|
result: QueryResult::Bootstrap(Ok(_)),
|
|
..
|
|
},
|
|
)) if query_id == id => {
|
|
if dst_swarm.behaviour_mut().kad.iter_queries().count() == 0 {
|
|
break;
|
|
}
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
let dst = async move {
|
|
// Destination Node receiving connection from Source Node via Relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::IncomingConnection { send_back_addr, .. } => {
|
|
assert_eq!(send_back_addr, Protocol::P2p(src_peer_id.into()).into());
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Destination Node establishing connection from Source Node via Relay.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == src_peer_id => {
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Destination Node waiting for Source Node to close connection.
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == src_peer_id => {
|
|
break;
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
};
|
|
|
|
let src = async move {
|
|
// Source Node looking for Destination Node on the Kademlia DHT.
|
|
let mut query_id = src_swarm.behaviour_mut().kad.get_closest_peers(dst_peer_id);
|
|
// One has to retry multiple times to wait for Relay to receive Identify event from Node
|
|
// B.
|
|
let mut tries = 0;
|
|
|
|
// Source Node establishing connection to Relay and, given that the DHT is small, Node
|
|
// B.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
|
if peer_id == relay_peer_id {
|
|
continue;
|
|
} else if peer_id == dst_peer_id {
|
|
break;
|
|
} else {
|
|
panic!("Unexpected peer id {:?}", peer_id);
|
|
}
|
|
}
|
|
SwarmEvent::Dialing(peer_id)
|
|
if peer_id == relay_peer_id || peer_id == dst_peer_id => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(
|
|
KademliaEvent::OutboundQueryCompleted {
|
|
id,
|
|
result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { .. })),
|
|
..
|
|
},
|
|
)) if id == query_id => {
|
|
tries += 1;
|
|
if tries > 300 {
|
|
panic!("Too many retries.");
|
|
}
|
|
|
|
query_id = src_swarm.behaviour_mut().kad.get_closest_peers(dst_peer_id);
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
// Source Node waiting for Ping from Destination Node via Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(ping::Event {
|
|
peer,
|
|
result: Ok(_),
|
|
})) => {
|
|
if peer == dst_peer_id {
|
|
break;
|
|
}
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
};
|
|
|
|
futures::future::join(dst, src).await
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn inactive_connection_timeout() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_keep_alive_swarm();
|
|
let mut dst_swarm = build_keep_alive_swarm();
|
|
let mut relay_swarm = build_keep_alive_swarm();
|
|
|
|
// Connections only kept alive by Source Node and Destination Node.
|
|
*relay_swarm.behaviour_mut().keep_alive.keep_alive_mut() = KeepAlive::No;
|
|
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
let new_listener = dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap();
|
|
// Wait for destination to listen via relay.
|
|
pool.run_until(async {
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(_) => {}
|
|
SwarmEvent::ConnectionEstablished { .. } => {}
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == new_listener => {
|
|
assert_eq!(address, dst_addr_via_relay);
|
|
break;
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
spawn_swarm_on_pool(&pool, dst_swarm);
|
|
|
|
pool.run_until(async move {
|
|
src_swarm.dial(relay_addr).unwrap();
|
|
// Source Node dialing Relay.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {
|
|
break;
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
|
|
src_swarm.dial(dst_addr_via_relay).unwrap();
|
|
|
|
// Source Node establishing connection to destination node via Relay.
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
// Relay should notice connection between Source Node and B to be idle. It should then close the
|
|
// relayed connection and eventually also close the connection to Source Node given that no
|
|
// connections are being relayed on the connection.
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionClosed { peer_id, .. } => {
|
|
if peer_id == relay_peer_id {
|
|
break;
|
|
}
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn concurrent_connection_same_relay_same_dst() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut src_swarm = build_swarm(Reachability::Firewalled, RelayMode::Passive);
|
|
let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut relay_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
|
|
let relay_peer_id = *relay_swarm.local_peer_id();
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
|
|
let relay_addr: Multiaddr = Protocol::Memory(rand::random::<u64>()).into();
|
|
let dst_addr_via_relay = relay_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_peer_id.clone().into()))
|
|
.with(Protocol::P2pCircuit)
|
|
.with(Protocol::P2p(dst_peer_id.clone().into()));
|
|
|
|
relay_swarm.listen_on(relay_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_swarm);
|
|
|
|
let dst_listener = dst_swarm.listen_on(dst_addr_via_relay.clone()).unwrap();
|
|
// Wait for destination to listen via relay.
|
|
pool.run_until(async {
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(_) => {}
|
|
SwarmEvent::ConnectionEstablished { .. } => {}
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener => {
|
|
assert_eq!(address, dst_addr_via_relay);
|
|
break;
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
spawn_swarm_on_pool(&pool, dst_swarm);
|
|
|
|
pool.run_until(async move {
|
|
src_swarm.dial(dst_addr_via_relay.clone()).unwrap();
|
|
src_swarm.dial(dst_addr_via_relay).unwrap();
|
|
|
|
// Source Node establishing two connections to destination node via Relay.
|
|
let mut num_established = 0;
|
|
loop {
|
|
match src_swarm.select_next_some().await {
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
|
|
num_established += 1;
|
|
if num_established == 2 {
|
|
break;
|
|
}
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Dialing(peer_id) => {
|
|
assert_eq!(peer_id, relay_peer_id);
|
|
}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Yield incoming connection through listener that listens via the relay node used by the
|
|
/// connection. In case the local node does not listen via the specific relay node, but has
|
|
/// registered a listener for all remaining incoming relayed connections, yield the connection via
|
|
/// that _catch-all_ listener. Drop the connection in all other cases.
|
|
///
|
|
/// ## Nodes
|
|
///
|
|
/// - Destination node explicitly listening via relay node 1 and 2.
|
|
///
|
|
/// - 3 relay nodes
|
|
///
|
|
/// - Source node 1 and 2 connecting to destination node via relay 1 and 2 respectively.
|
|
///
|
|
/// - Source node 3 connecting to destination node via relay 3, expecting first connection attempt
|
|
/// to fail as destination node is not listening for incoming connections from any relay node.
|
|
/// Expecting second connection attempt to succeed after destination node registered listener for
|
|
/// any incoming relayed connection.
|
|
#[test]
|
|
fn yield_incoming_connection_through_correct_listener() {
|
|
let _ = env_logger::try_init();
|
|
let mut pool = LocalPool::new();
|
|
|
|
let mut dst_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut src_1_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut src_2_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut src_3_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut relay_1_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut relay_2_swarm = build_swarm(Reachability::Routable, RelayMode::Passive);
|
|
let mut relay_3_swarm = build_swarm(Reachability::Routable, RelayMode::Active);
|
|
|
|
let dst_peer_id = *dst_swarm.local_peer_id();
|
|
let src_1_peer_id = *src_1_swarm.local_peer_id();
|
|
let src_2_peer_id = *src_2_swarm.local_peer_id();
|
|
let src_3_peer_id = *src_3_swarm.local_peer_id();
|
|
let relay_1_peer_id = *relay_1_swarm.local_peer_id();
|
|
let relay_2_peer_id = *relay_2_swarm.local_peer_id();
|
|
let relay_3_peer_id = *relay_3_swarm.local_peer_id();
|
|
|
|
let dst_memory_port = Protocol::Memory(rand::random::<u64>());
|
|
let dst_addr = Multiaddr::empty().with(dst_memory_port.clone());
|
|
|
|
let relay_1_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
|
|
let relay_1_addr_incl_circuit = relay_1_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_1_peer_id.into()))
|
|
.with(Protocol::P2pCircuit);
|
|
let dst_addr_via_relay_1 = relay_1_addr_incl_circuit
|
|
.clone()
|
|
.with(Protocol::P2p(dst_peer_id.into()));
|
|
let relay_2_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
|
|
let relay_2_addr_incl_circuit = relay_2_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_2_peer_id.into()))
|
|
.with(Protocol::P2pCircuit);
|
|
let dst_addr_via_relay_2 = relay_2_addr_incl_circuit
|
|
.clone()
|
|
.with(Protocol::P2p(dst_peer_id.into()));
|
|
let relay_3_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
|
|
let relay_3_addr_incl_circuit = relay_3_addr
|
|
.clone()
|
|
.with(Protocol::P2p(relay_3_peer_id.into()))
|
|
.with(Protocol::P2pCircuit);
|
|
let dst_addr_via_relay_3 = relay_3_addr_incl_circuit
|
|
.clone()
|
|
.with(dst_memory_port)
|
|
.with(Protocol::P2p(dst_peer_id.into()));
|
|
|
|
relay_1_swarm.listen_on(relay_1_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_1_swarm);
|
|
|
|
relay_2_swarm.listen_on(relay_2_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_2_swarm);
|
|
|
|
relay_3_swarm.listen_on(relay_3_addr.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, relay_3_swarm);
|
|
|
|
let dst_listener_via_relay_1 = dst_swarm
|
|
.listen_on(relay_1_addr_incl_circuit.clone())
|
|
.unwrap();
|
|
let dst_listener_via_relay_2 = dst_swarm
|
|
.listen_on(relay_2_addr_incl_circuit.clone())
|
|
.unwrap();
|
|
// Listen on own address in order for relay 3 to be able to connect to destination node.
|
|
let dst_listener = dst_swarm.listen_on(dst_addr.clone()).unwrap();
|
|
|
|
// Wait for destination node to establish connections to relay 1 and 2.
|
|
pool.run_until(async {
|
|
let mut established = 0u8;
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(peer_id)
|
|
if peer_id == relay_1_peer_id || peer_id == relay_2_peer_id => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. }
|
|
if peer_id == relay_1_peer_id || peer_id == relay_2_peer_id =>
|
|
{
|
|
established += 1;
|
|
if established == 2 {
|
|
break;
|
|
}
|
|
}
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener_via_relay_2 => {
|
|
assert_eq!(address, relay_2_addr_incl_circuit)
|
|
}
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener_via_relay_1 => {
|
|
assert_eq!(address, relay_1_addr_incl_circuit)
|
|
}
|
|
SwarmEvent::NewListenAddr {
|
|
address,
|
|
listener_id,
|
|
} if listener_id == dst_listener => assert_eq!(address, dst_addr),
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
|
|
src_1_swarm.dial(dst_addr_via_relay_1.clone()).unwrap();
|
|
src_2_swarm.dial(dst_addr_via_relay_2.clone()).unwrap();
|
|
spawn_swarm_on_pool(&pool, src_1_swarm);
|
|
spawn_swarm_on_pool(&pool, src_2_swarm);
|
|
|
|
// Expect source node 1 and 2 to reach destination node via relay 1 and 2 respectively.
|
|
pool.run_until(async {
|
|
let mut src_1_established = false;
|
|
let mut src_2_established = false;
|
|
let mut src_1_ping = false;
|
|
let mut src_2_ping = false;
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::IncomingConnection { .. } => {}
|
|
SwarmEvent::ConnectionEstablished {
|
|
peer_id, endpoint, ..
|
|
} => {
|
|
let local_addr = match endpoint {
|
|
ConnectedPoint::Dialer { .. } => unreachable!(),
|
|
ConnectedPoint::Listener { local_addr, .. } => local_addr,
|
|
};
|
|
|
|
if peer_id == src_1_peer_id {
|
|
assert_eq!(local_addr, relay_1_addr_incl_circuit);
|
|
src_1_established = true;
|
|
} else if peer_id == src_2_peer_id {
|
|
assert_eq!(local_addr, relay_2_addr_incl_circuit);
|
|
src_2_established = true;
|
|
} else {
|
|
unreachable!();
|
|
}
|
|
}
|
|
SwarmEvent::NewListenAddr { address, .. }
|
|
if address == relay_1_addr_incl_circuit
|
|
|| address == relay_2_addr_incl_circuit
|
|
|| address == dst_addr => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(ping::Event {
|
|
peer,
|
|
result: Ok(_),
|
|
})) => {
|
|
if peer == src_1_peer_id {
|
|
src_1_ping = true;
|
|
} else if peer == src_2_peer_id {
|
|
src_2_ping = true;
|
|
}
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
|
|
if src_1_established && src_2_established && src_1_ping && src_2_ping {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Expect destination node to reject incoming connection from unknown relay given that
|
|
// destination node is not listening for such connections.
|
|
src_3_swarm.dial(dst_addr_via_relay_3.clone()).unwrap();
|
|
pool.run_until(async {
|
|
loop {
|
|
futures::select! {
|
|
event = dst_swarm.select_next_some() => match event {
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::IncomingConnection { .. } => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. }
|
|
if peer_id == relay_3_peer_id => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. }
|
|
if peer_id == src_3_peer_id =>
|
|
{
|
|
panic!(
|
|
"Expected destination node to reject incoming connection from unknown relay \
|
|
without a catch-all listener",
|
|
);
|
|
}
|
|
e => panic!("{:?}", e),
|
|
},
|
|
event = src_3_swarm.select_next_some() => match event {
|
|
SwarmEvent::OutgoingConnectionError {
|
|
error: DialError::Transport(addresses),
|
|
peer_id,
|
|
} if *addresses.iter().map(|(a, _)| a).next().unwrap()
|
|
== dst_addr_via_relay_3 =>
|
|
{
|
|
assert_eq!(peer_id, Some(dst_peer_id));
|
|
break;
|
|
}
|
|
SwarmEvent::Dialing { .. } => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. }
|
|
if peer_id == relay_3_peer_id => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. }
|
|
if peer_id == dst_peer_id =>
|
|
{
|
|
panic!(
|
|
"Expected destination node to reject incoming connection from unknown relay \
|
|
without a catch-all listener",
|
|
);
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Instruct destination node to listen for incoming relayed connections from unknown relay nodes.
|
|
dst_swarm.listen_on(Protocol::P2pCircuit.into()).unwrap();
|
|
// Wait for destination node to report new listen address.
|
|
pool.run_until(async {
|
|
loop {
|
|
match dst_swarm.select_next_some().await {
|
|
SwarmEvent::NewListenAddr { address, .. }
|
|
if address == Protocol::P2pCircuit.into() =>
|
|
{
|
|
break
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
spawn_swarm_on_pool(&pool, dst_swarm);
|
|
|
|
// Expect destination node to accept incoming connection from "unknown" relay, i.e. the
|
|
// connection from source node 3 via relay 3.
|
|
src_3_swarm.dial(dst_addr_via_relay_3.clone()).unwrap();
|
|
pool.run_until(async move {
|
|
loop {
|
|
match src_3_swarm.select_next_some().await {
|
|
SwarmEvent::Dialing(_) => {}
|
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
|
|
break
|
|
}
|
|
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
|
|
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
|
|
..
|
|
})) => {}
|
|
e => panic!("{:?}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
#[derive(NetworkBehaviour)]
|
|
#[behaviour(
|
|
out_event = "CombinedEvent",
|
|
poll_method = "poll",
|
|
event_process = true
|
|
)]
|
|
struct CombinedBehaviour {
|
|
relay: Relay,
|
|
ping: ping::Behaviour,
|
|
kad: Kademlia<MemoryStore>,
|
|
identify: Identify,
|
|
|
|
#[behaviour(ignore)]
|
|
events: Vec<CombinedEvent>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum CombinedEvent {
|
|
Kad(KademliaEvent),
|
|
Ping(ping::Event),
|
|
}
|
|
|
|
impl CombinedBehaviour {
|
|
fn poll(
|
|
&mut self,
|
|
_: &mut Context,
|
|
_: &mut impl PollParameters,
|
|
) -> Poll<NetworkBehaviourAction<CombinedEvent, <Self as NetworkBehaviour>::ProtocolsHandler>>
|
|
{
|
|
if !self.events.is_empty() {
|
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
impl NetworkBehaviourEventProcess<ping::Event> for CombinedBehaviour {
|
|
fn inject_event(&mut self, event: ping::Event) {
|
|
self.events.push(CombinedEvent::Ping(event));
|
|
}
|
|
}
|
|
|
|
impl NetworkBehaviourEventProcess<KademliaEvent> for CombinedBehaviour {
|
|
fn inject_event(&mut self, event: KademliaEvent) {
|
|
self.events.push(CombinedEvent::Kad(event));
|
|
}
|
|
}
|
|
|
|
impl NetworkBehaviourEventProcess<IdentifyEvent> for CombinedBehaviour {
|
|
fn inject_event(&mut self, event: IdentifyEvent) {
|
|
if let IdentifyEvent::Received {
|
|
peer_id,
|
|
info: IdentifyInfo { listen_addrs, .. },
|
|
..
|
|
} = event
|
|
{
|
|
for addr in listen_addrs {
|
|
self.kad.add_address(&peer_id, addr);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl NetworkBehaviourEventProcess<()> for CombinedBehaviour {
|
|
fn inject_event(&mut self, _event: ()) {
|
|
unreachable!();
|
|
}
|
|
}
|
|
|
|
#[derive(NetworkBehaviour)]
|
|
#[behaviour(event_process = true)]
|
|
struct CombinedKeepAliveBehaviour {
|
|
relay: Relay,
|
|
keep_alive: DummyBehaviour,
|
|
}
|
|
|
|
impl NetworkBehaviourEventProcess<()> for CombinedKeepAliveBehaviour {
|
|
fn inject_event(&mut self, _event: ()) {
|
|
unreachable!();
|
|
}
|
|
}
|
|
|
|
impl NetworkBehaviourEventProcess<Void> for CombinedKeepAliveBehaviour {
|
|
fn inject_event(&mut self, _event: Void) {
|
|
unreachable!();
|
|
}
|
|
}
|
|
|
|
enum Reachability {
|
|
Firewalled,
|
|
Routable,
|
|
}
|
|
|
|
enum RelayMode {
|
|
Active,
|
|
Passive,
|
|
}
|
|
|
|
impl From<RelayMode> for bool {
|
|
fn from(relay: RelayMode) -> Self {
|
|
match relay {
|
|
RelayMode::Active => true,
|
|
RelayMode::Passive => false,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Wrapper around a [`Transport`] allowing outgoing but denying incoming connections.
|
|
///
|
|
/// Meant for testing purposes only.
|
|
#[derive(Clone)]
|
|
pub struct Firewall<T>(pub T);
|
|
|
|
impl<T: Transport> Transport for Firewall<T> {
|
|
type Output = <T as Transport>::Output;
|
|
type Error = <T as Transport>::Error;
|
|
type Listener = futures::stream::Pending<<<T as Transport>::Listener as Stream>::Item>;
|
|
type ListenerUpgrade = <T as Transport>::ListenerUpgrade;
|
|
type Dial = <T as Transport>::Dial;
|
|
|
|
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
|
|
Ok(futures::stream::pending())
|
|
}
|
|
|
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
|
self.0.dial(addr)
|
|
}
|
|
|
|
fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
|
self.0.dial_as_listener(addr)
|
|
}
|
|
|
|
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
|
self.0.address_translation(server, observed)
|
|
}
|
|
}
|
|
|
|
fn build_swarm(reachability: Reachability, relay_mode: RelayMode) -> Swarm<CombinedBehaviour> {
|
|
let local_key = identity::Keypair::generate_ed25519();
|
|
let local_public_key = local_key.public();
|
|
let plaintext = PlainText2Config {
|
|
local_public_key: local_public_key.clone(),
|
|
};
|
|
let local_peer_id = local_public_key.to_peer_id();
|
|
|
|
let transport = MemoryTransport::default();
|
|
|
|
let transport = match reachability {
|
|
Reachability::Firewalled => EitherTransport::Left(Firewall(transport)),
|
|
Reachability::Routable => EitherTransport::Right(transport),
|
|
};
|
|
|
|
let (transport, relay_behaviour) = new_transport_and_behaviour(
|
|
RelayConfig {
|
|
actively_connect_to_dst_nodes: relay_mode.into(),
|
|
..Default::default()
|
|
},
|
|
transport,
|
|
);
|
|
|
|
let transport = transport
|
|
.upgrade(upgrade::Version::V1)
|
|
.authenticate(plaintext)
|
|
.multiplex(libp2p_yamux::YamuxConfig::default())
|
|
.boxed();
|
|
|
|
let combined_behaviour = CombinedBehaviour {
|
|
relay: relay_behaviour,
|
|
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_millis(100))),
|
|
kad: Kademlia::new(
|
|
local_peer_id.clone(),
|
|
MemoryStore::new(local_peer_id.clone()),
|
|
),
|
|
identify: Identify::new(IdentifyConfig::new(
|
|
"test".to_string(),
|
|
local_public_key.clone(),
|
|
)),
|
|
events: Default::default(),
|
|
};
|
|
|
|
Swarm::new(transport, combined_behaviour, local_peer_id)
|
|
}
|
|
|
|
fn build_keep_alive_swarm() -> Swarm<CombinedKeepAliveBehaviour> {
|
|
let local_key = identity::Keypair::generate_ed25519();
|
|
let local_public_key = local_key.public();
|
|
let plaintext = PlainText2Config {
|
|
local_public_key: local_public_key.clone(),
|
|
};
|
|
let local_peer_id = local_public_key.to_peer_id();
|
|
|
|
let transport = MemoryTransport::default();
|
|
|
|
let (transport, relay_behaviour) =
|
|
new_transport_and_behaviour(RelayConfig::default(), transport);
|
|
|
|
let transport = transport
|
|
.upgrade(upgrade::Version::V1)
|
|
.authenticate(plaintext)
|
|
.multiplex(libp2p_yamux::YamuxConfig::default())
|
|
.boxed();
|
|
|
|
let combined_behaviour = CombinedKeepAliveBehaviour {
|
|
relay: relay_behaviour,
|
|
keep_alive: DummyBehaviour::with_keep_alive(KeepAlive::Yes),
|
|
};
|
|
|
|
Swarm::new(transport, combined_behaviour, local_peer_id)
|
|
}
|
|
|
|
fn build_keep_alive_only_swarm() -> Swarm<DummyBehaviour> {
|
|
let local_key = identity::Keypair::generate_ed25519();
|
|
let local_public_key = local_key.public();
|
|
let plaintext = PlainText2Config {
|
|
local_public_key: local_public_key.clone(),
|
|
};
|
|
let local_peer_id = local_public_key.to_peer_id();
|
|
|
|
let transport = MemoryTransport::default();
|
|
|
|
let transport = transport
|
|
.upgrade(upgrade::Version::V1)
|
|
.authenticate(plaintext)
|
|
.multiplex(libp2p_yamux::YamuxConfig::default())
|
|
.boxed();
|
|
|
|
Swarm::new(
|
|
transport,
|
|
DummyBehaviour::with_keep_alive(KeepAlive::Yes),
|
|
local_peer_id,
|
|
)
|
|
}
|
|
|
|
fn spawn_swarm_on_pool<B: NetworkBehaviour>(pool: &LocalPool, mut swarm: Swarm<B>) {
|
|
pool.spawner()
|
|
.spawn_obj(
|
|
async move {
|
|
loop {
|
|
swarm.next().await;
|
|
}
|
|
}
|
|
.boxed()
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
}
|