feat: Add WebRTC transport (#2622)

Hey 👋 This is a WebRTC transport implemented in accordance w/ the [spec](https://github.com/libp2p/specs/pull/412). It's based on the [webrtc-rs](https://github.com/webrtc-rs/webrtc) library.

Resolves: #1066.
This commit is contained in:
Anton Kaliaev 2022-11-17 09:17:31 +04:00 committed by GitHub
parent 43fdfe27ea
commit a714864885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 4220 additions and 3 deletions

View File

@ -44,6 +44,7 @@ full = [
"wasm-bindgen", "wasm-bindgen",
"wasm-ext", "wasm-ext",
"wasm-ext-websocket", "wasm-ext-websocket",
"webrtc",
"websocket", "websocket",
"yamux", "yamux",
] ]
@ -75,11 +76,12 @@ rsa = ["libp2p-core/rsa"]
secp256k1 = ["libp2p-core/secp256k1"] secp256k1 = ["libp2p-core/secp256k1"]
serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"]
tcp = ["dep:libp2p-tcp"] tcp = ["dep:libp2p-tcp"]
tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-webrtc?/tokio"]
uds = ["dep:libp2p-uds"] uds = ["dep:libp2p-uds"]
wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"]
wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext = ["dep:libp2p-wasm-ext"]
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"]
webrtc = ["dep:libp2p-webrtc", "libp2p-webrtc?/pem"]
websocket = ["dep:libp2p-websocket"] websocket = ["dep:libp2p-websocket"]
yamux = ["dep:libp2p-yamux"] yamux = ["dep:libp2p-yamux"]
@ -108,6 +110,7 @@ libp2p-request-response = { version = "0.23.0", path = "protocols/request-respon
libp2p-swarm = { version = "0.41.0", path = "swarm" } libp2p-swarm = { version = "0.41.0", path = "swarm" }
libp2p-uds = { version = "0.37.0", path = "transports/uds", optional = true } libp2p-uds = { version = "0.37.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.38.0", path = "transports/wasm-ext", optional = true } libp2p-wasm-ext = { version = "0.38.0", path = "transports/wasm-ext", optional = true }
libp2p-webrtc = { version = "0.1.0-alpha", path = "transports/webrtc", optional = true }
libp2p-yamux = { version = "0.42.0", path = "muxers/yamux", optional = true } libp2p-yamux = { version = "0.42.0", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.16.0" } multiaddr = { version = "0.16.0" }
parking_lot = "0.12.0" parking_lot = "0.12.0"
@ -168,7 +171,8 @@ members = [
"transports/tcp", "transports/tcp",
"transports/uds", "transports/uds",
"transports/websocket", "transports/websocket",
"transports/wasm-ext" "transports/wasm-ext",
"transports/webrtc"
] ]
[[example]] [[example]]

View File

@ -1,7 +1,10 @@
# 0.3.0 [unreleased] # 0.3.0 [unreleased]
- Implement `From` trait for `std::io::Error`. See [PR 2622].
- Don't leak `prost` dependency in `Error` type. See [PR 3058]. - Don't leak `prost` dependency in `Error` type. See [PR 3058].
[PR 2622]: https://github.com/libp2p/rust-libp2p/pull/2622/
[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058/ [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058/
# 0.2.0 # 0.2.0

View File

@ -20,7 +20,7 @@ unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
[dev-dependencies] [dev-dependencies]
prost-build = "0.11" prost-build = "0.11"
# Passing arguments to the docsrs builder in order to properly document cfg's. # Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling # More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true

View File

@ -65,3 +65,9 @@ impl<In, Out: Message + Default> Decoder for Codec<In, Out> {
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
#[error("Failed to encode/decode message")] #[error("Failed to encode/decode message")]
pub struct Error(#[from] std::io::Error); pub struct Error(#[from] std::io::Error);
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
e.0
}
}

View File

@ -125,6 +125,10 @@ pub use libp2p_uds as uds;
#[cfg(feature = "wasm-ext")] #[cfg(feature = "wasm-ext")]
#[doc(inline)] #[doc(inline)]
pub use libp2p_wasm_ext as wasm_ext; pub use libp2p_wasm_ext as wasm_ext;
#[cfg(feature = "webrtc")]
#[cfg_attr(docsrs, doc(cfg(feature = "webrtc")))]
#[doc(inline)]
pub use libp2p_webrtc as webrtc;
#[cfg(feature = "websocket")] #[cfg(feature = "websocket")]
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
#[doc(inline)] #[doc(inline)]

View File

@ -0,0 +1,58 @@
[package]
name = "libp2p-webrtc"
version = "0.1.0-alpha"
authors = ["Parity Technologies <admin@parity.io>"]
description = "WebRTC transport for libp2p"
repository = "https://github.com/libp2p/rust-libp2p"
license = "MIT"
edition = "2021"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-trait = "0.1"
asynchronous-codec = "0.6.1"
bytes = "1"
futures = "0.3"
futures-timer = "3"
hex = "0.4"
if-watch = "2.0"
libp2p-core = { version = "0.38.0", path = "../../core" }
libp2p-noise = { version = "0.41.0", path = "../../transports/noise" }
log = "0.4"
multihash = { version = "0.16", default-features = false, features = ["sha2"] }
prost = "0.11"
prost-codec = { version = "0.3.0", path = "../../misc/prost-codec" }
rand = "0.8"
rcgen = "0.9.3"
serde = { version = "1.0", features = ["derive"] }
stun = "0.4"
thiserror = "1"
tinytemplate = "1.2"
tokio = { version = "1.19", features = ["net"], optional = true}
tokio-util = { version = "0.7", features = ["compat"], optional = true }
webrtc = { version = "0.6.0", optional = true }
[features]
tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc"]
pem = ["webrtc?/pem"]
[build-dependencies]
prost-build = "0.11"
[dev-dependencies]
anyhow = "1.0"
env_logger = "0.9"
hex-literal = "0.3"
libp2p = { path = "../..", features = ["full"] }
tokio = { version = "1.19", features = ["full"] }
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
void = "1"
[[test]]
name = "smoke"
required-features = ["tokio"]
[[example]]
name = "listen_ping"
required-features = ["tokio"]

View File

@ -0,0 +1,23 @@
// Copyright 2022 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
fn main() {
prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap();
}

View File

@ -0,0 +1,66 @@
use anyhow::Result;
use futures::StreamExt;
use libp2p::swarm::{keep_alive, NetworkBehaviour};
use libp2p::Transport;
use libp2p::{ping, Swarm};
use libp2p_core::identity;
use libp2p_core::muxing::StreamMuxerBox;
use rand::thread_rng;
use void::Void;
/// An example WebRTC server that will accept connections and run the ping protocol on them.
#[tokio::main]
async fn main() -> Result<()> {
let mut swarm = create_swarm()?;
swarm.listen_on("/ip4/127.0.0.1/udp/0/webrtc".parse()?)?;
loop {
let event = swarm.next().await.unwrap();
eprintln!("New event: {event:?}")
}
}
fn create_swarm() -> Result<Swarm<Behaviour>> {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().to_peer_id();
let transport = libp2p_webrtc::tokio::Transport::new(
id_keys,
libp2p_webrtc::tokio::Certificate::generate(&mut thread_rng())?,
);
let transport = transport
.map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)))
.boxed();
Ok(Swarm::with_tokio_executor(
transport,
Behaviour::default(),
peer_id,
))
}
#[derive(NetworkBehaviour, Default)]
#[behaviour(out_event = "Event", event_process = false)]
struct Behaviour {
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Event {
Ping(ping::Event),
}
impl From<ping::Event> for Event {
fn from(e: ping::Event) -> Self {
Event::Ping(e)
}
}
impl From<Void> for Event {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}

View File

@ -0,0 +1,90 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of the [`libp2p_core::Transport`] trait for WebRTC protocol without a signaling
//! server.
//!
//! # Overview
//!
//! ## ICE
//!
//! RFCs: 8839, 8445 See also:
//! <https://tools.ietf.org/id/draft-ietf-rtcweb-sdp-08.html#rfc.section.5.2.3>
//!
//! The WebRTC protocol uses ICE in order to establish a connection.
//!
//! In a typical ICE setup, there are two endpoints, called agents, that want to communicate. One
//! of these two agents can be the local browser, while the other agent is the target of the
//! connection.
//!
//! Even though in this specific context all we want is a simple client-server communication, it is
//! helpful to keep in mind that ICE was designed to solve the problem of NAT traversal.
//!
//! The ICE workflow works as follows:
//!
//! - An "offerer" determines ways in which it could be accessible (either an
//! IP address or through a relay using a TURN server), which are called "candidates". It then
//! generates a small text payload in a format called SDP, that describes the request for a
//! connection.
//! - The offerer sends this SDP-encoded message to the answerer. The medium through which this
//! exchange is done is out of scope of the ICE protocol.
//! - The answerer then finds its own candidates, and generates an answer, again in the SDP format.
//! This answer is sent back to the offerer.
//! - Each agent then tries to connect to the remote's candidates.
//!
//! We pretend to send the offer to the remote agent (the target of the connection), then pretend
//! that it has found a valid IP address for itself (i.e. a candidate), then pretend that the SDP
//! answer containing this candidate has been sent back. This will cause the offerer to execute
//! step 4: try to connect to the remote's candidate.
//!
//! ## TCP or UDP
//!
//! WebRTC by itself doesn't hardcode any specific protocol for media streams. Instead, it is the
//! SDP message of the offerer that specifies which protocol to use. In our use case (one or more
//! data channels), we know that the offerer will always request either TCP+DTLS+SCTP, or
//! UDP+DTLS+SCTP.
//!
//! The implementation only supports UDP at the moment, so if the offerer requests TCP+DTLS+SCTP, it
//! will not respond. Support for TCP may be added in the future (see
//! <https://github.com/webrtc-rs/webrtc/issues/132>).
//!
//! ## DTLS+SCTP
//!
//! RFCs: 8841, 8832
//!
//! In both cases (TCP or UDP), the next layer is DTLS. DTLS is similar to the well-known TLS
//! protocol, except that it doesn't guarantee ordering of delivery (as this is instead provided by
//! the SCTP layer on top of DTLS). In other words, once the TCP or UDP connection is established,
//! the browser will try to perform a DTLS handshake.
//!
//! During the ICE negotiation, each agent must include in its SDP packet a hash of the self-signed
//! certificate that it will use during the DTLS handshake. In our use-case, where we try to
//! hand-crate the SDP answer generated by the remote, this is problematic. A way to solve this
//! is to make the hash a part of the remote's multiaddr. On the server side, we turn
//! certificate verification off.
mod message_proto {
#![allow(clippy::derive_partial_eq_without_eq)]
include!(concat!(env!("OUT_DIR"), "/webrtc.pb.rs"));
}
#[cfg(feature = "tokio")]
pub mod tokio;

View File

@ -0,0 +1,20 @@
syntax = "proto2";
package webrtc.pb;
message Message {
enum Flag {
// The sender will no longer send messages on the stream.
FIN = 0;
// The sender will no longer read messages on the stream. Incoming data is
// being discarded on receipt.
STOP_SENDING = 1;
// The sender abruptly terminates the sending part of the stream. The
// receiver can discard any data that it already received on that stream.
RESET = 2;
}
optional Flag flag=1;
optional bytes message = 2;
}

View File

@ -0,0 +1,120 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use rand::{distributions::DistString, CryptoRng, Rng};
use webrtc::peer_connection::certificate::RTCCertificate;
use crate::tokio::fingerprint::Fingerprint;
#[derive(Debug, Clone, PartialEq)]
pub struct Certificate {
inner: RTCCertificate,
}
impl Certificate {
/// Generate new certificate.
///
/// `_rng` argument is ignored for now. See <https://github.com/melekes/rust-libp2p/pull/12>.
pub fn generate<R>(_rng: &mut R) -> Result<Self, Error>
where
R: CryptoRng + Rng,
{
let mut params = rcgen::CertificateParams::new(vec![
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
]);
params.alg = &rcgen::PKCS_ECDSA_P256_SHA256;
Ok(Self {
inner: RTCCertificate::from_params(params).expect("default params to work"),
})
}
/// Returns SHA-256 fingerprint of this certificate.
///
/// # Panics
///
/// This function will panic if there's no fingerprint with the SHA-256 algorithm (see
/// [`RTCCertificate::get_fingerprints`]).
pub fn fingerprint(&self) -> Fingerprint {
let fingerprints = self.inner.get_fingerprints();
let sha256_fingerprint = fingerprints
.iter()
.find(|f| f.algorithm == "sha-256")
.expect("a SHA-256 fingerprint");
Fingerprint::try_from_rtc_dtls(sha256_fingerprint).expect("we filtered by sha-256")
}
/// Parses a certificate from the ASCII PEM format.
///
/// See [`RTCCertificate::from_pem`]
#[cfg(feature = "pem")]
pub fn from_pem(pem_str: &str) -> Result<Self, Error> {
Ok(Self {
inner: RTCCertificate::from_pem(pem_str).map_err(Kind::InvalidPEM)?,
})
}
/// Serializes the certificate (including the private key) in PKCS#8 format in PEM.
///
/// See [`RTCCertificate::serialize_pem`]
#[cfg(feature = "pem")]
pub fn serialize_pem(&self) -> String {
self.inner.serialize_pem()
}
/// Extract the [`RTCCertificate`] from this wrapper.
///
/// This function is `pub(crate)` to avoid leaking the `webrtc` dependency to our users.
pub(crate) fn to_rtc_certificate(&self) -> RTCCertificate {
self.inner.clone()
}
}
#[derive(thiserror::Error, Debug)]
#[error("Failed to generate certificate")]
pub struct Error(#[from] Kind);
#[derive(thiserror::Error, Debug)]
enum Kind {
#[error(transparent)]
InvalidPEM(#[from] webrtc::Error),
}
#[cfg(test)]
mod test {
#[cfg(feature = "pem")]
use anyhow::Result;
#[cfg(feature = "pem")]
#[test]
fn test_certificate_serialize_pem_and_from_pem() -> Result<()> {
use super::*;
use rand::thread_rng;
let cert = Certificate::generate(&mut thread_rng()).unwrap();
let pem = cert.serialize_pem();
let loaded_cert = Certificate::from_pem(&pem)?;
assert_eq!(loaded_cert, cert);
Ok(())
}
}

View File

@ -0,0 +1,299 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::stream::FuturesUnordered;
use futures::{
channel::{
mpsc,
oneshot::{self, Sender},
},
lock::Mutex as FutMutex,
StreamExt,
{future::BoxFuture, ready},
};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use webrtc::data::data_channel::DataChannel as DetachedDataChannel;
use webrtc::data_channel::RTCDataChannel;
use webrtc::peer_connection::RTCPeerConnection;
use std::task::Waker;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use crate::tokio::{error::Error, substream, substream::Substream};
/// Maximum number of unprocessed data channels.
/// See [`Connection::poll_inbound`].
const MAX_DATA_CHANNELS_IN_FLIGHT: usize = 10;
/// A WebRTC connection, wrapping [`RTCPeerConnection`] and implementing [`StreamMuxer`] trait.
pub struct Connection {
/// [`RTCPeerConnection`] to the remote peer.
///
/// Uses futures mutex because used in async code (see poll_outbound and poll_close).
peer_conn: Arc<FutMutex<RTCPeerConnection>>,
/// Channel onto which incoming data channels are put.
incoming_data_channels_rx: mpsc::Receiver<Arc<DetachedDataChannel>>,
/// Future, which, once polled, will result in an outbound substream.
outbound_fut: Option<BoxFuture<'static, Result<Arc<DetachedDataChannel>, Error>>>,
/// Future, which, once polled, will result in closing the entire connection.
close_fut: Option<BoxFuture<'static, Result<(), Error>>>,
/// A list of futures, which, once completed, signal that a [`Substream`] has been dropped.
drop_listeners: FuturesUnordered<substream::DropListener>,
no_drop_listeners_waker: Option<Waker>,
}
impl Unpin for Connection {}
impl Connection {
/// Creates a new connection.
pub(crate) async fn new(rtc_conn: RTCPeerConnection) -> Self {
let (data_channel_tx, data_channel_rx) = mpsc::channel(MAX_DATA_CHANNELS_IN_FLIGHT);
Connection::register_incoming_data_channels_handler(
&rtc_conn,
Arc::new(FutMutex::new(data_channel_tx)),
)
.await;
Self {
peer_conn: Arc::new(FutMutex::new(rtc_conn)),
incoming_data_channels_rx: data_channel_rx,
outbound_fut: None,
close_fut: None,
drop_listeners: FuturesUnordered::default(),
no_drop_listeners_waker: None,
}
}
/// Registers a handler for incoming data channels.
///
/// NOTE: `mpsc::Sender` is wrapped in `Arc` because cloning a raw sender would make the channel
/// unbounded. "The channels capacity is equal to buffer + num-senders. In other words, each
/// sender gets a guaranteed slot in the channel capacity..."
/// See <https://docs.rs/futures/latest/futures/channel/mpsc/fn.channel.html>
async fn register_incoming_data_channels_handler(
rtc_conn: &RTCPeerConnection,
tx: Arc<FutMutex<mpsc::Sender<Arc<DetachedDataChannel>>>>,
) {
rtc_conn.on_data_channel(Box::new(move |data_channel: Arc<RTCDataChannel>| {
log::debug!("Incoming data channel {}", data_channel.id());
let tx = tx.clone();
Box::pin(async move {
data_channel.on_open({
let data_channel = data_channel.clone();
Box::new(move || {
log::debug!("Data channel {} open", data_channel.id());
Box::pin(async move {
let data_channel = data_channel.clone();
let id = data_channel.id();
match data_channel.detach().await {
Ok(detached) => {
let mut tx = tx.lock().await;
if let Err(e) = tx.try_send(detached.clone()) {
log::error!("Can't send data channel {}: {}", id, e);
// We're not accepting data channels fast enough =>
// close this channel.
//
// Ideally we'd refuse to accept a data channel
// during the negotiation process, but it's not
// possible with the current API.
if let Err(e) = detached.close().await {
log::error!(
"Failed to close data channel {}: {}",
id,
e
);
}
}
}
Err(e) => {
log::error!("Can't detach data channel {}: {}", id, e);
}
};
})
})
});
})
}));
}
}
impl StreamMuxer for Connection {
type Substream = Substream;
type Error = Error;
fn poll_inbound(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match ready!(self.incoming_data_channels_rx.poll_next_unpin(cx)) {
Some(detached) => {
log::trace!("Incoming substream {}", detached.stream_identifier());
let (substream, drop_listener) = Substream::new(detached);
self.drop_listeners.push(drop_listener);
if let Some(waker) = self.no_drop_listeners_waker.take() {
waker.wake()
}
Poll::Ready(Ok(substream))
}
None => {
debug_assert!(
false,
"Sender-end of channel should be owned by `RTCPeerConnection`"
);
Poll::Pending // Return `Pending` without registering a waker: If the channel is closed, we don't need to be called anymore.
}
}
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
loop {
match ready!(self.drop_listeners.poll_next_unpin(cx)) {
Some(Ok(())) => {}
Some(Err(e)) => {
log::debug!("a DropListener failed: {e}")
}
None => {
self.no_drop_listeners_waker = Some(cx.waker().clone());
return Poll::Pending;
}
}
}
}
fn poll_outbound(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
let peer_conn = self.peer_conn.clone();
let fut = self.outbound_fut.get_or_insert(Box::pin(async move {
let peer_conn = peer_conn.lock().await;
let data_channel = peer_conn.create_data_channel("", None).await?;
// No need to hold the lock during the DTLS handshake.
drop(peer_conn);
log::trace!("Opening data channel {}", data_channel.id());
let (tx, rx) = oneshot::channel::<Arc<DetachedDataChannel>>();
// Wait until the data channel is opened and detach it.
register_data_channel_open_handler(data_channel, tx).await;
// Wait until data channel is opened and ready to use
match rx.await {
Ok(detached) => Ok(detached),
Err(e) => Err(Error::Internal(e.to_string())),
}
}));
match ready!(fut.as_mut().poll(cx)) {
Ok(detached) => {
self.outbound_fut = None;
log::trace!("Outbound substream {}", detached.stream_identifier());
let (substream, drop_listener) = Substream::new(detached);
self.drop_listeners.push(drop_listener);
if let Some(waker) = self.no_drop_listeners_waker.take() {
waker.wake()
}
Poll::Ready(Ok(substream))
}
Err(e) => {
self.outbound_fut = None;
Poll::Ready(Err(e))
}
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
log::debug!("Closing connection");
let peer_conn = self.peer_conn.clone();
let fut = self.close_fut.get_or_insert(Box::pin(async move {
let peer_conn = peer_conn.lock().await;
peer_conn.close().await?;
Ok(())
}));
match ready!(fut.as_mut().poll(cx)) {
Ok(()) => {
self.incoming_data_channels_rx.close();
self.close_fut = None;
Poll::Ready(Ok(()))
}
Err(e) => {
self.close_fut = None;
Poll::Ready(Err(e))
}
}
}
}
pub(crate) async fn register_data_channel_open_handler(
data_channel: Arc<RTCDataChannel>,
data_channel_tx: Sender<Arc<DetachedDataChannel>>,
) {
data_channel.on_open({
let data_channel = data_channel.clone();
Box::new(move || {
log::debug!("Data channel {} open", data_channel.id());
Box::pin(async move {
let data_channel = data_channel.clone();
let id = data_channel.id();
match data_channel.detach().await {
Ok(detached) => {
if let Err(e) = data_channel_tx.send(detached.clone()) {
log::error!("Can't send data channel {}: {:?}", id, e);
if let Err(e) = detached.close().await {
log::error!("Failed to close data channel {}: {}", id, e);
}
}
}
Err(e) => {
log::error!("Can't detach data channel {}: {}", id, e);
}
};
})
})
});
}

View File

@ -0,0 +1,46 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::PeerId;
use thiserror::Error;
/// Error in WebRTC.
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
WebRTC(#[from] webrtc::Error),
#[error("IO error")]
Io(#[from] std::io::Error),
#[error("failed to authenticate peer")]
Authentication(#[from] libp2p_noise::NoiseError),
// Authentication errors.
#[error("invalid peer ID (expected {expected}, got {got})")]
InvalidPeerID { expected: PeerId, got: PeerId },
#[error("no active listeners, can not dial without a previous listen")]
NoListeners,
#[error("UDP mux error: {0}")]
UDPMux(std::io::Error),
#[error("internal error: {0} (see debug logs)")]
Internal(String),
}

View File

@ -0,0 +1,116 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use multihash::{Code, Hasher, Multihash, MultihashDigest};
use webrtc::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint;
use std::fmt;
const SHA256: &str = "sha-256";
/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm.
#[derive(Eq, PartialEq, Copy, Clone)]
pub struct Fingerprint([u8; 32]);
impl Fingerprint {
pub(crate) const FF: Fingerprint = Fingerprint([0xFF; 32]);
#[cfg(test)]
pub fn raw(bytes: [u8; 32]) -> Self {
Self(bytes)
}
/// Creates a fingerprint from a raw certificate.
pub fn from_certificate(bytes: &[u8]) -> Self {
let mut h = multihash::Sha2_256::default();
h.update(bytes);
let mut bytes: [u8; 32] = [0; 32];
bytes.copy_from_slice(h.finalize());
Fingerprint(bytes)
}
/// Converts [`RTCDtlsFingerprint`] to [`Fingerprint`].
pub fn try_from_rtc_dtls(fp: &RTCDtlsFingerprint) -> Option<Self> {
if fp.algorithm != SHA256 {
return None;
}
let mut buf = [0; 32];
hex::decode_to_slice(fp.value.replace(':', ""), &mut buf).ok()?;
Some(Self(buf))
}
/// Converts [`type@Multihash`] to [`Fingerprint`].
pub fn try_from_multihash(hash: Multihash) -> Option<Self> {
if hash.code() != u64::from(Code::Sha2_256) {
// Only support SHA256 for now.
return None;
}
let bytes = hash.digest().try_into().ok()?;
Some(Self(bytes))
}
/// Converts this fingerprint to [`type@Multihash`].
pub fn to_multihash(self) -> Multihash {
Code::Sha2_256
.wrap(&self.0)
.expect("fingerprint's len to be 32 bytes")
}
/// Formats this fingerprint as uppercase hex, separated by colons (`:`).
///
/// This is the format described in <https://www.rfc-editor.org/rfc/rfc4572#section-5>.
pub fn to_sdp_format(self) -> String {
self.0.map(|byte| format!("{:02X}", byte)).join(":")
}
/// Returns the algorithm used (e.g. "sha-256").
/// See <https://datatracker.ietf.org/doc/html/rfc8122#section-5>
pub fn algorithm(&self) -> String {
SHA256.to_owned()
}
}
impl fmt::Debug for Fingerprint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sdp_format() {
let fp = Fingerprint::raw(hex_literal::hex!(
"7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC"
));
let sdp_format = fp.to_sdp_format();
assert_eq!(sdp_format, "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC")
}
}

View File

@ -0,0 +1,35 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
pub mod certificate;
mod connection;
mod error;
mod fingerprint;
mod req_res_chan;
mod sdp;
mod substream;
mod transport;
mod udp_mux;
mod upgrade;
pub use certificate::Certificate;
pub use connection::Connection;
pub use error::Error;
pub use transport::Transport;

View File

@ -0,0 +1,75 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use std::{
io,
task::{Context, Poll},
};
pub fn new<Req, Res>(capacity: usize) -> (Sender<Req, Res>, Receiver<Req, Res>) {
let (sender, receiver) = mpsc::channel(capacity);
(
Sender {
inner: futures::lock::Mutex::new(sender),
},
Receiver { inner: receiver },
)
}
pub struct Sender<Req, Res> {
inner: futures::lock::Mutex<mpsc::Sender<(Req, oneshot::Sender<Res>)>>,
}
impl<Req, Res> Sender<Req, Res> {
pub async fn send(&self, req: Req) -> io::Result<Res> {
let (sender, receiver) = oneshot::channel();
self.inner
.lock()
.await
.send((req, sender))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let res = receiver
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(res)
}
}
pub struct Receiver<Req, Res> {
inner: mpsc::Receiver<(Req, oneshot::Sender<Res>)>,
}
impl<Req, Res> Receiver<Req, Res> {
pub fn poll_next_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<(Req, oneshot::Sender<Res>)>> {
self.inner.poll_next_unpin(cx)
}
}

View File

@ -0,0 +1,252 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use serde::Serialize;
use tinytemplate::TinyTemplate;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use std::net::{IpAddr, SocketAddr};
use crate::tokio::fingerprint::Fingerprint;
/// Creates the SDP answer used by the client.
pub fn answer(
addr: SocketAddr,
server_fingerprint: &Fingerprint,
client_ufrag: &str,
) -> RTCSessionDescription {
RTCSessionDescription::answer(render_description(
SERVER_SESSION_DESCRIPTION,
addr,
server_fingerprint,
client_ufrag,
))
.unwrap()
}
/// Creates the SDP offer used by the server.
///
/// Certificate verification is disabled which is why we hardcode a dummy fingerprint here.
pub fn offer(addr: SocketAddr, client_ufrag: &str) -> RTCSessionDescription {
RTCSessionDescription::offer(render_description(
CLIENT_SESSION_DESCRIPTION,
addr,
&Fingerprint::FF,
client_ufrag,
))
.unwrap()
}
// An SDP message that constitutes the offer.
//
// Main RFC: <https://datatracker.ietf.org/doc/html/rfc8866>
// `sctp-port` and `max-message-size` attrs RFC: <https://datatracker.ietf.org/doc/html/rfc8841>
// `group` and `mid` attrs RFC: <https://datatracker.ietf.org/doc/html/rfc9143>
// `ice-ufrag`, `ice-pwd` and `ice-options` attrs RFC: <https://datatracker.ietf.org/doc/html/rfc8839>
// `setup` attr RFC: <https://datatracker.ietf.org/doc/html/rfc8122>
//
// Short description:
//
// v=<protocol-version> -> always 0
// o=<username> <sess-id> <sess-version> <nettype> <addrtype> <unicast-address>
//
// <username> identifies the creator of the SDP document. We are allowed to use dummy values
// (`-` and `0.0.0.0` as <addrtype>) to remain anonymous, which we do. Note that "IN" means
// "Internet".
//
// s=<session name>
//
// We are allowed to pass a dummy `-`.
//
// c=<nettype> <addrtype> <connection-address>
//
// Indicates the IP address of the remote.
// Note that "IN" means "Internet".
//
// t=<start-time> <stop-time>
//
// Start and end of the validity of the session. `0 0` means that the session never expires.
//
// m=<media> <port> <proto> <fmt> ...
//
// A `m=` line describes a request to establish a certain protocol. The protocol in this line
// (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be the same as the one in the offer.
// We know that this is true because we tweak the offer to match the protocol. The `<fmt>`
// component must always be `webrtc-datachannel` for WebRTC.
// RFCs: 8839, 8866, 8841
//
// a=mid:<MID>
//
// Media ID - uniquely identifies this media stream (RFC9143).
//
// a=ice-options:ice2
//
// Indicates that we are complying with RFC8839 (as oppposed to the legacy RFC5245).
//
// a=ice-ufrag:<ICE user>
// a=ice-pwd:<ICE password>
//
// ICE username and password, which are used for establishing and
// maintaining the ICE connection. (RFC8839)
// MUST match ones used by the answerer (server).
//
// a=fingerprint:sha-256 <fingerprint>
//
// Fingerprint of the certificate that the remote will use during the TLS
// handshake. (RFC8122)
//
// a=setup:actpass
//
// The endpoint that is the offerer MUST use the setup attribute value of setup:actpass and be
// prepared to receive a client_hello before it receives the answer.
//
// a=sctp-port:<value>
//
// The SCTP port (RFC8841)
// Note it's different from the "m=" line port value, which indicates the port of the
// underlying transport-layer protocol (UDP or TCP).
//
// a=max-message-size:<value>
//
// The maximum SCTP user message size (in bytes). (RFC8841)
const CLIENT_SESSION_DESCRIPTION: &str = "v=0
o=- 0 0 IN {ip_version} {target_ip}
s=-
c=IN {ip_version} {target_ip}
t=0 0
m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel
a=mid:0
a=ice-options:ice2
a=ice-ufrag:{ufrag}
a=ice-pwd:{pwd}
a=fingerprint:{fingerprint_algorithm} {fingerprint_value}
a=setup:actpass
a=sctp-port:5000
a=max-message-size:16384
";
// See [`CLIENT_SESSION_DESCRIPTION`].
//
// a=ice-lite
//
// A lite implementation is only appropriate for devices that will *always* be connected to
// the public Internet and have a public IP address at which it can receive packets from any
// correspondent. ICE will not function when a lite implementation is placed behind a NAT
// (RFC8445).
//
// a=tls-id:<id>
//
// "TLS ID" uniquely identifies a TLS association.
// The ICE protocol uses a "TLS ID" system to indicate whether a fresh DTLS connection
// must be reopened in case of ICE renegotiation. Considering that ICE renegotiations
// never happen in our use case, we can simply put a random value and not care about
// it. Note however that the TLS ID in the answer must be present if and only if the
// offer contains one. (RFC8842)
// TODO: is it true that renegotiations never happen? what about a connection closing?
// "tls-id" attribute MUST be present in the initial offer and respective answer (RFC8839).
// XXX: but right now browsers don't send it.
//
// a=setup:passive
//
// "passive" indicates that the remote DTLS server will only listen for incoming
// connections. (RFC5763)
// The answerer (server) MUST not be located behind a NAT (RFC6135).
//
// The answerer MUST use either a setup attribute value of setup:active or setup:passive.
// Note that if the answerer uses setup:passive, then the DTLS handshake will not begin until
// the answerer is received, which adds additional latency. setup:active allows the answer and
// the DTLS handshake to occur in parallel. Thus, setup:active is RECOMMENDED.
//
// a=candidate:<foundation> <component-id> <transport> <priority> <connection-address> <port> <cand-type>
//
// A transport address for a candidate that can be used for connectivity checks (RFC8839).
//
// a=end-of-candidates
//
// Indicate that no more candidates will ever be sent (RFC8838).
const SERVER_SESSION_DESCRIPTION: &str = "v=0
o=- 0 0 IN {ip_version} {target_ip}
s=-
t=0 0
a=ice-lite
m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel
c=IN {ip_version} {target_ip}
a=mid:0
a=ice-options:ice2
a=ice-ufrag:{ufrag}
a=ice-pwd:{pwd}
a=fingerprint:{fingerprint_algorithm} {fingerprint_value}
a=setup:passive
a=sctp-port:5000
a=max-message-size:16384
a=candidate:1 1 UDP 1 {target_ip} {target_port} typ host
a=end-of-candidates
";
/// Indicates the IP version used in WebRTC: `IP4` or `IP6`.
#[derive(Serialize)]
enum IpVersion {
IP4,
IP6,
}
/// Context passed to the templating engine, which replaces the above placeholders (e.g.
/// `{IP_VERSION}`) with real values.
#[derive(Serialize)]
struct DescriptionContext {
pub ip_version: IpVersion,
pub target_ip: IpAddr,
pub target_port: u16,
pub fingerprint_algorithm: String,
pub fingerprint_value: String,
pub ufrag: String,
pub pwd: String,
}
/// Renders a [`TinyTemplate`] description using the provided arguments.
fn render_description(
description: &str,
addr: SocketAddr,
fingerprint: &Fingerprint,
ufrag: &str,
) -> String {
let mut tt = TinyTemplate::new();
tt.add_template("description", description).unwrap();
let context = DescriptionContext {
ip_version: {
if addr.is_ipv4() {
IpVersion::IP4
} else {
IpVersion::IP6
}
},
target_ip: addr.ip(),
target_port: addr.port(),
fingerprint_algorithm: fingerprint.algorithm(),
fingerprint_value: fingerprint.to_sdp_format(),
// NOTE: ufrag is equal to pwd.
ufrag: ufrag.to_owned(),
pwd: ufrag.to_owned(),
};
tt.render("description", &context).unwrap()
}

View File

@ -0,0 +1,295 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use asynchronous_codec::Framed;
use bytes::Bytes;
use futures::{channel::oneshot, prelude::*, ready};
use tokio_util::compat::Compat;
use webrtc::data::data_channel::{DataChannel, PollDataChannel};
use std::{
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use crate::message_proto::{message::Flag, Message};
use crate::tokio::{
substream::drop_listener::GracefullyClosed,
substream::framed_dc::FramedDc,
substream::state::{Closing, State},
};
mod drop_listener;
mod framed_dc;
mod state;
/// Maximum length of a message.
///
/// "As long as message interleaving is not supported, the sender SHOULD limit the maximum message
/// size to 16 KB to avoid monopolization."
/// Source: <https://www.rfc-editor.org/rfc/rfc8831#name-transferring-user-data-on-a>
const MAX_MSG_LEN: usize = 16384; // 16kiB
/// Length of varint, in bytes.
const VARINT_LEN: usize = 2;
/// Overhead of the protobuf encoding, in bytes.
const PROTO_OVERHEAD: usize = 5;
/// Maximum length of data, in bytes.
const MAX_DATA_LEN: usize = MAX_MSG_LEN - VARINT_LEN - PROTO_OVERHEAD;
pub use drop_listener::DropListener;
/// A substream on top of a WebRTC data channel.
///
/// To be a proper libp2p substream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well
/// as support a half-closed state which we do by framing messages in a protobuf envelope.
pub struct Substream {
io: FramedDc,
state: State,
read_buffer: Bytes,
/// Dropping this will close the oneshot and notify the receiver by emitting `Canceled`.
drop_notifier: Option<oneshot::Sender<GracefullyClosed>>,
}
impl Substream {
/// Returns a new `Substream` and a listener, which will notify the receiver when/if the substream
/// is dropped.
pub(crate) fn new(data_channel: Arc<DataChannel>) -> (Self, DropListener) {
let (sender, receiver) = oneshot::channel();
let substream = Self {
io: framed_dc::new(data_channel.clone()),
state: State::Open,
read_buffer: Bytes::default(),
drop_notifier: Some(sender),
};
let listener = DropListener::new(framed_dc::new(data_channel), receiver);
(substream, listener)
}
/// Gracefully closes the "read-half" of the substream.
pub fn poll_close_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.state.close_read_barrier()? {
Some(Closing::Requested) => {
ready!(self.io.poll_ready_unpin(cx))?;
self.io.start_send_unpin(Message {
flag: Some(Flag::StopSending.into()),
message: None,
})?;
self.state.close_read_message_sent();
continue;
}
Some(Closing::MessageSent) => {
ready!(self.io.poll_flush_unpin(cx))?;
self.state.read_closed();
return Poll::Ready(Ok(()));
}
None => return Poll::Ready(Ok(())),
}
}
}
}
impl AsyncRead for Substream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
self.state.read_barrier()?;
if !self.read_buffer.is_empty() {
let n = std::cmp::min(self.read_buffer.len(), buf.len());
let data = self.read_buffer.split_to(n);
buf[0..n].copy_from_slice(&data[..]);
return Poll::Ready(Ok(n));
}
let Self {
read_buffer,
io,
state,
..
} = &mut *self;
match ready!(io_poll_next(io, cx))? {
Some((flag, message)) => {
if let Some(flag) = flag {
state.handle_inbound_flag(flag, read_buffer);
}
debug_assert!(read_buffer.is_empty());
if let Some(message) = message {
*read_buffer = message.into();
}
}
None => {
state.handle_inbound_flag(Flag::Fin, read_buffer);
return Poll::Ready(Ok(0));
}
}
}
}
}
impl AsyncWrite for Substream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
while self.state.read_flags_in_async_write() {
// TODO: In case AsyncRead::poll_read encountered an error or returned None earlier, we will poll the
// underlying I/O resource once more. Is that allowed? How about introducing a state IoReadClosed?
let Self {
read_buffer,
io,
state,
..
} = &mut *self;
match io_poll_next(io, cx)? {
Poll::Ready(Some((Some(flag), message))) => {
// Read side is closed. Discard any incoming messages.
drop(message);
// But still handle flags, e.g. a `Flag::StopSending`.
state.handle_inbound_flag(flag, read_buffer)
}
Poll::Ready(Some((None, message))) => drop(message),
Poll::Ready(None) | Poll::Pending => break,
}
}
self.state.write_barrier()?;
ready!(self.io.poll_ready_unpin(cx))?;
let n = usize::min(buf.len(), MAX_DATA_LEN);
Pin::new(&mut self.io).start_send(Message {
flag: None,
message: Some(buf[0..n].into()),
})?;
Poll::Ready(Ok(n))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.poll_flush_unpin(cx).map_err(Into::into)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.state.close_write_barrier()? {
Some(Closing::Requested) => {
ready!(self.io.poll_ready_unpin(cx))?;
self.io.start_send_unpin(Message {
flag: Some(Flag::Fin.into()),
message: None,
})?;
self.state.close_write_message_sent();
continue;
}
Some(Closing::MessageSent) => {
ready!(self.io.poll_flush_unpin(cx))?;
self.state.write_closed();
let _ = self
.drop_notifier
.take()
.expect("to not close twice")
.send(GracefullyClosed {});
return Poll::Ready(Ok(()));
}
None => return Poll::Ready(Ok(())),
}
}
}
}
fn io_poll_next(
io: &mut Framed<Compat<PollDataChannel>, prost_codec::Codec<Message>>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<(Option<Flag>, Option<Vec<u8>>)>>> {
match ready!(io.poll_next_unpin(cx))
.transpose()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
{
Some(Message { flag, message }) => {
let flag = flag
.map(|f| {
Flag::from_i32(f).ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, ""))
})
.transpose()?;
Poll::Ready(Ok(Some((flag, message))))
}
None => Poll::Ready(Ok(None)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use asynchronous_codec::Encoder;
use bytes::BytesMut;
use prost::Message;
use unsigned_varint::codec::UviBytes;
#[test]
fn max_data_len() {
// Largest possible message.
let message = [0; MAX_DATA_LEN];
let protobuf = crate::message_proto::Message {
flag: Some(crate::message_proto::message::Flag::Fin.into()),
message: Some(message.to_vec()),
};
let mut encoded_msg = BytesMut::new();
protobuf
.encode(&mut encoded_msg)
.expect("BytesMut to have sufficient capacity.");
assert_eq!(encoded_msg.len(), message.len() + PROTO_OVERHEAD);
let mut uvi = UviBytes::default();
let mut dst = BytesMut::new();
uvi.encode(encoded_msg.clone().freeze(), &mut dst).unwrap();
// Ensure the varint prefixed and protobuf encoded largest message is no longer than the
// maximum limit specified in the libp2p WebRTC specification.
assert_eq!(dst.len(), MAX_MSG_LEN);
assert_eq!(dst.len() - encoded_msg.len(), VARINT_LEN);
}
}

View File

@ -0,0 +1,130 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::{FutureExt, SinkExt};
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::message_proto::{message::Flag, Message};
use crate::tokio::substream::framed_dc::FramedDc;
#[must_use]
pub struct DropListener {
state: State,
}
impl DropListener {
pub fn new(stream: FramedDc, receiver: oneshot::Receiver<GracefullyClosed>) -> Self {
let substream_id = stream.get_ref().stream_identifier();
Self {
state: State::Idle {
stream,
receiver,
substream_id,
},
}
}
}
enum State {
/// The [`DropListener`] is idle and waiting to be activated.
Idle {
stream: FramedDc,
receiver: oneshot::Receiver<GracefullyClosed>,
substream_id: u16,
},
/// The stream got dropped and we are sending a reset flag.
SendingReset {
stream: FramedDc,
},
Flushing {
stream: FramedDc,
},
/// Bad state transition.
Poisoned,
}
impl Future for DropListener {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let state = &mut self.get_mut().state;
loop {
match std::mem::replace(state, State::Poisoned) {
State::Idle {
stream,
substream_id,
mut receiver,
} => match receiver.poll_unpin(cx) {
Poll::Ready(Ok(GracefullyClosed {})) => {
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(Canceled)) => {
log::info!("Substream {substream_id} dropped without graceful close, sending Reset");
*state = State::SendingReset { stream };
continue;
}
Poll::Pending => {
*state = State::Idle {
stream,
substream_id,
receiver,
};
return Poll::Pending;
}
},
State::SendingReset { mut stream } => match stream.poll_ready_unpin(cx)? {
Poll::Ready(()) => {
stream.start_send_unpin(Message {
flag: Some(Flag::Reset.into()),
message: None,
})?;
*state = State::Flushing { stream };
continue;
}
Poll::Pending => {
*state = State::SendingReset { stream };
return Poll::Pending;
}
},
State::Flushing { mut stream } => match stream.poll_flush_unpin(cx)? {
Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {
*state = State::Flushing { stream };
return Poll::Pending;
}
},
State::Poisoned => {
unreachable!()
}
}
}
}
}
/// Indicates that our substream got gracefully closed.
pub struct GracefullyClosed {}

View File

@ -0,0 +1,45 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use asynchronous_codec::Framed;
use tokio_util::compat::Compat;
use tokio_util::compat::TokioAsyncReadCompatExt;
use webrtc::data::data_channel::{DataChannel, PollDataChannel};
use std::sync::Arc;
use super::{MAX_DATA_LEN, MAX_MSG_LEN, VARINT_LEN};
use crate::message_proto::Message;
pub type FramedDc = Framed<Compat<PollDataChannel>, prost_codec::Codec<Message>>;
pub fn new(data_channel: Arc<DataChannel>) -> FramedDc {
let mut inner = PollDataChannel::new(data_channel);
inner.set_read_buf_capacity(MAX_MSG_LEN);
let mut framed = Framed::new(
inner.compat(),
prost_codec::Codec::new(MAX_MSG_LEN - VARINT_LEN),
);
// If not set, `Framed` buffers up to 131kB of data before sending, which leads to "outbound
// packet larger than maximum message size" error in webrtc-rs.
framed.set_send_high_water_mark(MAX_DATA_LEN);
framed
}

View File

@ -0,0 +1,510 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use std::io;
use crate::message_proto::message::Flag;
#[derive(Debug, Copy, Clone)]
pub enum State {
Open,
ReadClosed,
WriteClosed,
ClosingRead {
/// Whether the write side of our channel was already closed.
write_closed: bool,
inner: Closing,
},
ClosingWrite {
/// Whether the write side of our channel was already closed.
read_closed: bool,
inner: Closing,
},
BothClosed {
reset: bool,
},
}
/// Represents the state of closing one half (either read or write) of the connection.
///
/// Gracefully closing the read or write requires sending the `STOP_SENDING` or `FIN` flag respectively
/// and flushing the underlying connection.
#[derive(Debug, Copy, Clone)]
pub enum Closing {
Requested,
MessageSent,
}
impl State {
/// Performs a state transition for a flag contained in an inbound message.
pub(crate) fn handle_inbound_flag(&mut self, flag: Flag, buffer: &mut Bytes) {
let current = *self;
match (current, flag) {
(Self::Open, Flag::Fin) => {
*self = Self::ReadClosed;
}
(Self::WriteClosed, Flag::Fin) => {
*self = Self::BothClosed { reset: false };
}
(Self::Open, Flag::StopSending) => {
*self = Self::WriteClosed;
}
(Self::ReadClosed, Flag::StopSending) => {
*self = Self::BothClosed { reset: false };
}
(_, Flag::Reset) => {
buffer.clear();
*self = Self::BothClosed { reset: true };
}
_ => {}
}
}
pub(crate) fn write_closed(&mut self) {
match self {
State::ClosingWrite {
read_closed: true,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::BothClosed { reset: false };
}
State::ClosingWrite {
read_closed: false,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::WriteClosed;
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingRead { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
pub(crate) fn close_write_message_sent(&mut self) {
match self {
State::ClosingWrite { inner, read_closed } => {
debug_assert!(matches!(inner, Closing::Requested));
*self = State::ClosingWrite {
read_closed: *read_closed,
inner: Closing::MessageSent,
};
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingRead { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
pub(crate) fn read_closed(&mut self) {
match self {
State::ClosingRead {
write_closed: true,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::BothClosed { reset: false };
}
State::ClosingRead {
write_closed: false,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::ReadClosed;
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingWrite { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
pub(crate) fn close_read_message_sent(&mut self) {
match self {
State::ClosingRead {
inner,
write_closed,
} => {
debug_assert!(matches!(inner, Closing::Requested));
*self = State::ClosingRead {
write_closed: *write_closed,
inner: Closing::MessageSent,
};
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingWrite { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
/// Whether we should read from the stream in the [`futures::AsyncWrite`] implementation.
///
/// This is necessary for read-closed streams because we would otherwise not read any more flags from
/// the socket.
pub(crate) fn read_flags_in_async_write(&self) -> bool {
matches!(self, Self::ReadClosed)
}
/// Acts as a "barrier" for [`futures::AsyncRead::poll_read`].
pub(crate) fn read_barrier(&self) -> io::Result<()> {
use crate::tokio::substream::State::{Open, ReadClosed, WriteClosed};
use State::*;
let kind = match self {
Open
| WriteClosed
| ClosingWrite {
read_closed: false, ..
} => return Ok(()),
ClosingWrite {
read_closed: true, ..
}
| ReadClosed
| ClosingRead { .. }
| BothClosed { reset: false } => io::ErrorKind::BrokenPipe,
BothClosed { reset: true } => io::ErrorKind::ConnectionReset,
};
Err(kind.into())
}
/// Acts as a "barrier" for [`futures::AsyncWrite::poll_write`].
pub(crate) fn write_barrier(&self) -> io::Result<()> {
use crate::tokio::substream::State::{Open, ReadClosed, WriteClosed};
use State::*;
let kind = match self {
Open
| ReadClosed
| ClosingRead {
write_closed: false,
..
} => return Ok(()),
ClosingRead {
write_closed: true, ..
}
| WriteClosed
| ClosingWrite { .. }
| BothClosed { reset: false } => io::ErrorKind::BrokenPipe,
BothClosed { reset: true } => io::ErrorKind::ConnectionReset,
};
Err(kind.into())
}
/// Acts as a "barrier" for [`futures::AsyncWrite::poll_close`].
pub(crate) fn close_write_barrier(&mut self) -> io::Result<Option<Closing>> {
loop {
match &self {
State::WriteClosed => return Ok(None),
State::ClosingWrite { inner, .. } => return Ok(Some(*inner)),
State::Open => {
*self = Self::ClosingWrite {
read_closed: false,
inner: Closing::Requested,
};
}
State::ReadClosed => {
*self = Self::ClosingWrite {
read_closed: true,
inner: Closing::Requested,
};
}
State::ClosingRead {
write_closed: true, ..
}
| State::BothClosed { reset: false } => {
return Err(io::ErrorKind::BrokenPipe.into())
}
State::ClosingRead {
write_closed: false,
..
} => {
return Err(io::Error::new(
io::ErrorKind::Other,
"cannot close read half while closing write half",
))
}
State::BothClosed { reset: true } => {
return Err(io::ErrorKind::ConnectionReset.into())
}
}
}
}
/// Acts as a "barrier" for [`Substream::poll_close_read`](super::Substream::poll_close_read).
pub fn close_read_barrier(&mut self) -> io::Result<Option<Closing>> {
loop {
match self {
State::ReadClosed => return Ok(None),
State::ClosingRead { inner, .. } => return Ok(Some(*inner)),
State::Open => {
*self = Self::ClosingRead {
write_closed: false,
inner: Closing::Requested,
};
}
State::WriteClosed => {
*self = Self::ClosingRead {
write_closed: true,
inner: Closing::Requested,
};
}
State::ClosingWrite {
read_closed: true, ..
}
| State::BothClosed { reset: false } => {
return Err(io::ErrorKind::BrokenPipe.into())
}
State::ClosingWrite {
read_closed: false, ..
} => {
return Err(io::Error::new(
io::ErrorKind::Other,
"cannot close write half while closing read half",
))
}
State::BothClosed { reset: true } => {
return Err(io::ErrorKind::ConnectionReset.into())
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
#[test]
fn cannot_read_after_receiving_fin() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::Fin, &mut Bytes::default());
let error = open.read_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn cannot_read_after_closing_read() {
let mut open = State::Open;
open.close_read_barrier().unwrap();
open.close_read_message_sent();
open.read_closed();
let error = open.read_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn cannot_write_after_receiving_stop_sending() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::StopSending, &mut Bytes::default());
let error = open.write_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn cannot_write_after_closing_write() {
let mut open = State::Open;
open.close_write_barrier().unwrap();
open.close_write_message_sent();
open.write_closed();
let error = open.write_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn everything_broken_after_receiving_reset() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::Reset, &mut Bytes::default());
let error1 = open.read_barrier().unwrap_err();
let error2 = open.write_barrier().unwrap_err();
let error3 = open.close_write_barrier().unwrap_err();
let error4 = open.close_read_barrier().unwrap_err();
assert_eq!(error1.kind(), ErrorKind::ConnectionReset);
assert_eq!(error2.kind(), ErrorKind::ConnectionReset);
assert_eq!(error3.kind(), ErrorKind::ConnectionReset);
assert_eq!(error4.kind(), ErrorKind::ConnectionReset);
}
#[test]
fn should_read_flags_in_async_write_after_read_closed() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::Fin, &mut Bytes::default());
assert!(open.read_flags_in_async_write())
}
#[test]
fn cannot_read_or_write_after_receiving_fin_and_stop_sending() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::Fin, &mut Bytes::default());
open.handle_inbound_flag(Flag::StopSending, &mut Bytes::default());
let error1 = open.read_barrier().unwrap_err();
let error2 = open.write_barrier().unwrap_err();
assert_eq!(error1.kind(), ErrorKind::BrokenPipe);
assert_eq!(error2.kind(), ErrorKind::BrokenPipe);
}
#[test]
fn can_read_after_closing_write() {
let mut open = State::Open;
open.close_write_barrier().unwrap();
open.close_write_message_sent();
open.write_closed();
open.read_barrier().unwrap();
}
#[test]
fn can_write_after_closing_read() {
let mut open = State::Open;
open.close_read_barrier().unwrap();
open.close_read_message_sent();
open.read_closed();
open.write_barrier().unwrap();
}
#[test]
fn cannot_write_after_starting_close() {
let mut open = State::Open;
open.close_write_barrier().expect("to close in open");
let error = open.write_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe);
}
#[test]
fn cannot_read_after_starting_close() {
let mut open = State::Open;
open.close_read_barrier().expect("to close in open");
let error = open.read_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe);
}
#[test]
fn can_read_in_open() {
let open = State::Open;
let result = open.read_barrier();
result.unwrap();
}
#[test]
fn can_write_in_open() {
let open = State::Open;
let result = open.write_barrier();
result.unwrap();
}
#[test]
fn write_close_barrier_returns_ok_when_closed() {
let mut open = State::Open;
open.close_write_barrier().unwrap();
open.close_write_message_sent();
open.write_closed();
let maybe = open.close_write_barrier().unwrap();
assert!(maybe.is_none())
}
#[test]
fn read_close_barrier_returns_ok_when_closed() {
let mut open = State::Open;
open.close_read_barrier().unwrap();
open.close_read_message_sent();
open.read_closed();
let maybe = open.close_read_barrier().unwrap();
assert!(maybe.is_none())
}
#[test]
fn reset_flag_clears_buffer() {
let mut open = State::Open;
let mut buffer = Bytes::copy_from_slice(b"foobar");
open.handle_inbound_flag(Flag::Reset, &mut buffer);
assert!(buffer.is_empty());
}
}

View File

@ -0,0 +1,604 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{future::BoxFuture, prelude::*, ready, stream::SelectAll, stream::Stream};
use if_watch::{IfEvent, IfWatcher};
use libp2p_core::{
identity,
multiaddr::{Multiaddr, Protocol},
transport::{ListenerId, TransportError, TransportEvent},
PeerId,
};
use webrtc::peer_connection::configuration::RTCConfiguration;
use std::net::IpAddr;
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use crate::tokio::{
certificate::Certificate,
connection::Connection,
error::Error,
fingerprint::Fingerprint,
udp_mux::{UDPMuxEvent, UDPMuxNewAddr},
upgrade,
};
/// A WebRTC transport with direct p2p communication (without a STUN server).
pub struct Transport {
/// The config which holds this peer's keys and certificate.
config: Config,
/// All the active listeners.
listeners: SelectAll<ListenStream>,
}
impl Transport {
/// Creates a new WebRTC transport.
///
/// # Example
///
/// ```
/// use libp2p_core::identity;
/// use rand::thread_rng;
/// use libp2p_webrtc::tokio::{Transport, Certificate};
///
/// let id_keys = identity::Keypair::generate_ed25519();
/// let transport = Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap());
/// ```
pub fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self {
Self {
config: Config::new(id_keys, certificate),
listeners: SelectAll::new(),
}
}
}
impl libp2p_core::Transport for Transport {
type Output = (PeerId, Connection);
type Error = Error;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let id = ListenerId::new();
let socket_addr =
parse_webrtc_listen_addr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?;
let udp_mux = UDPMuxNewAddr::listen_on(socket_addr)
.map_err(|io| TransportError::Other(Error::Io(io)))?;
self.listeners.push(ListenStream::new(
id,
self.config.clone(),
udp_mux,
IfWatcher::new().map_err(|io| TransportError::Other(Error::Io(io)))?,
));
Ok(id)
}
fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
listener.close(Ok(()));
true
} else {
false
}
}
/// Poll all listeners.
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match self.listeners.poll_next_unpin(cx) {
Poll::Ready(Some(ev)) => Poll::Ready(ev),
_ => Poll::Pending,
}
}
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let (sock_addr, server_fingerprint) = parse_webrtc_dial_addr(&addr)
.ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?;
if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() {
return Err(TransportError::MultiaddrNotSupported(addr));
}
let config = self.config.clone();
let client_fingerprint = self.config.fingerprint;
let udp_mux = self
.listeners
.iter()
.next()
.ok_or(TransportError::Other(Error::NoListeners))?
.udp_mux
.udp_mux_handle();
Ok(async move {
let (peer_id, connection) = upgrade::outbound(
sock_addr,
config.inner,
udp_mux,
client_fingerprint,
server_fingerprint,
config.id_keys,
)
.await?;
Ok((peer_id, connection))
}
.boxed())
}
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
// TODO: As the listener of a WebRTC hole punch, we need to send a random UDP packet to the
// `addr`. See DCUtR specification below.
//
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.dial(addr)
}
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
libp2p_core::address_translation(server, observed)
}
}
/// A stream of incoming connections on one or more interfaces.
struct ListenStream {
/// The ID of this listener.
listener_id: ListenerId,
/// The socket address that the listening socket is bound to,
/// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
/// when listening on all interfaces for IPv4 respectively IPv6 connections.
listen_addr: SocketAddr,
/// The config which holds this peer's certificate(s).
config: Config,
/// The UDP muxer that manages all ICE connections.
udp_mux: UDPMuxNewAddr,
/// Set to `Some` if this listener should close.
///
/// Optionally contains a [`TransportEvent::ListenerClosed`] that should be
/// reported before the listener's stream is terminated.
report_closed: Option<Option<<Self as Stream>::Item>>,
/// Watcher for network interface changes.
/// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
/// become or stop being available.
///
/// `None` if the socket is only listening on a single interface.
if_watcher: IfWatcher,
}
impl ListenStream {
/// Constructs a `WebRTCListenStream` for incoming connections.
fn new(
listener_id: ListenerId,
config: Config,
udp_mux: UDPMuxNewAddr,
if_watcher: IfWatcher,
) -> Self {
ListenStream {
listener_id,
listen_addr: udp_mux.listen_addr(),
config,
udp_mux,
report_closed: None,
if_watcher,
}
}
/// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and
/// terminate the stream.
fn close(&mut self, reason: Result<(), Error>) {
match self.report_closed {
Some(_) => log::debug!("Listener was already closed."),
None => {
// Report the listener event as closed.
let _ = self
.report_closed
.insert(Some(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
}));
}
}
}
fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
while let Poll::Ready(event) = self.if_watcher.poll_if_event(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4()
|| self.listen_addr.is_ipv6() == ip.is_ipv6()
{
return Poll::Ready(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr: self
.listen_multiaddress(ip, self.config.id_keys.public().to_peer_id()),
});
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4()
|| self.listen_addr.is_ipv6() == ip.is_ipv6()
{
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: self.listener_id,
listen_addr: self
.listen_multiaddress(ip, self.config.id_keys.public().to_peer_id()),
});
}
}
Err(err) => {
return Poll::Ready(TransportEvent::ListenerError {
listener_id: self.listener_id,
error: Error::Io(err),
});
}
}
}
Poll::Pending
}
/// Constructs a [`Multiaddr`] for the given IP address that represents our listen address.
fn listen_multiaddress(&self, ip: IpAddr, local_peer_id: PeerId) -> Multiaddr {
let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
socketaddr_to_multiaddr(&socket_addr, Some(self.config.fingerprint))
.with(Protocol::P2p(*local_peer_id.as_ref()))
}
}
impl Stream for ListenStream {
type Item = TransportEvent<<Transport as libp2p_core::Transport>::ListenerUpgrade, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
if let Some(closed) = self.report_closed.as_mut() {
// Listener was closed.
// Report the transport event if there is one. On the next iteration, return
// `Poll::Ready(None)` to terminate the stream.
return Poll::Ready(closed.take());
}
if let Poll::Ready(event) = self.poll_if_watcher(cx) {
return Poll::Ready(Some(event));
}
// Poll UDP muxer for new addresses or incoming data for streams.
match ready!(self.udp_mux.poll(cx)) {
UDPMuxEvent::NewAddr(new_addr) => {
let local_addr =
socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint));
let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None);
let upgrade = upgrade::inbound(
new_addr.addr,
self.config.inner.clone(),
self.udp_mux.udp_mux_handle(),
self.config.fingerprint,
new_addr.ufrag,
self.config.id_keys.clone(),
)
.boxed();
return Poll::Ready(Some(TransportEvent::Incoming {
upgrade,
local_addr,
send_back_addr,
listener_id: self.listener_id,
}));
}
UDPMuxEvent::Error(e) => {
self.close(Err(Error::UDPMux(e)));
}
}
}
}
}
/// A config which holds peer's keys and a x509Cert used to authenticate WebRTC communications.
#[derive(Clone)]
struct Config {
inner: RTCConfiguration,
fingerprint: Fingerprint,
id_keys: identity::Keypair,
}
impl Config {
/// Returns a new [`Config`] with the given keys and certificate.
fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self {
let fingerprint = certificate.fingerprint();
Self {
id_keys,
inner: RTCConfiguration {
certificates: vec![certificate.to_rtc_certificate()],
..RTCConfiguration::default()
},
fingerprint,
}
}
}
/// Turns an IP address and port into the corresponding WebRTC multiaddr.
fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, certhash: Option<Fingerprint>) -> Multiaddr {
let addr = Multiaddr::empty()
.with(socket_addr.ip().into())
.with(Protocol::Udp(socket_addr.port()))
.with(Protocol::WebRTC);
if let Some(fp) = certhash {
return addr.with(Protocol::Certhash(fp.to_multihash()));
}
addr
}
/// Parse the given [`Multiaddr`] into a [`SocketAddr`] for listening.
fn parse_webrtc_listen_addr(addr: &Multiaddr) -> Option<SocketAddr> {
let mut iter = addr.iter();
let ip = match iter.next()? {
Protocol::Ip4(ip) => IpAddr::from(ip),
Protocol::Ip6(ip) => IpAddr::from(ip),
_ => return None,
};
let port = iter.next()?;
let webrtc = iter.next()?;
let port = match (port, webrtc) {
(Protocol::Udp(port), Protocol::WebRTC) => port,
_ => return None,
};
if iter.next().is_some() {
return None;
}
Some(SocketAddr::new(ip, port))
}
/// Parse the given [`Multiaddr`] into a [`SocketAddr`] and a [`Fingerprint`] for dialing.
fn parse_webrtc_dial_addr(addr: &Multiaddr) -> Option<(SocketAddr, Fingerprint)> {
let mut iter = addr.iter();
let ip = match iter.next()? {
Protocol::Ip4(ip) => IpAddr::from(ip),
Protocol::Ip6(ip) => IpAddr::from(ip),
_ => return None,
};
let port = iter.next()?;
let webrtc = iter.next()?;
let certhash = iter.next()?;
let (port, fingerprint) = match (port, webrtc, certhash) {
(Protocol::Udp(port), Protocol::WebRTC, Protocol::Certhash(cert_hash)) => {
let fingerprint = Fingerprint::try_from_multihash(cert_hash)?;
(port, fingerprint)
}
_ => return None,
};
match iter.next() {
Some(Protocol::P2p(_)) => {}
// peer ID is optional
None => {}
// unexpected protocol
Some(_) => return None,
}
Some((SocketAddr::new(ip, port), fingerprint))
}
// Tests //////////////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod tests {
use super::*;
use futures::future::poll_fn;
use libp2p_core::{multiaddr::Protocol, Transport as _};
use rand::thread_rng;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
#[test]
fn missing_webrtc_protocol() {
let addr = "/ip4/127.0.0.1/udp/1234".parse().unwrap();
let maybe_parsed = parse_webrtc_listen_addr(&addr);
assert!(maybe_parsed.is_none());
}
#[test]
fn parse_valid_address_with_certhash_and_p2p() {
let addr = "/ip4/127.0.0.1/udp/39901/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_dial_addr(&addr);
assert_eq!(
maybe_parsed,
Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901),
Fingerprint::raw(hex_literal::hex!(
"e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb"
))
))
);
}
#[test]
fn peer_id_is_not_required() {
let addr = "/ip4/127.0.0.1/udp/39901/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_dial_addr(&addr);
assert_eq!(
maybe_parsed,
Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901),
Fingerprint::raw(hex_literal::hex!(
"e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb"
))
))
);
}
#[test]
fn tcp_is_invalid_protocol() {
let addr = "/ip4/127.0.0.1/tcp/12345/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_listen_addr(&addr);
assert!(maybe_parsed.is_none());
}
#[test]
fn cannot_follow_other_protocols_after_certhash() {
let addr = "/ip4/127.0.0.1/udp/12345/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/tcp/12345"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_listen_addr(&addr);
assert!(maybe_parsed.is_none());
}
#[test]
fn parse_ipv6() {
let addr =
"/ip6/::1/udp/12345/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_dial_addr(&addr);
assert_eq!(
maybe_parsed,
Some((
SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345),
Fingerprint::raw(hex_literal::hex!(
"e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb"
))
))
);
}
#[test]
fn can_parse_valid_addr_without_certhash() {
let addr = "/ip6/::1/udp/12345/webrtc".parse().unwrap();
let maybe_parsed = parse_webrtc_listen_addr(&addr);
assert_eq!(
maybe_parsed,
Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345))
);
}
#[test]
fn fails_to_parse_if_certhash_present_but_wrong_hash_function() {
// We only support SHA2-256 for now but this certhash has been encoded with SHA3-256.
let addr =
"/ip6/::1/udp/12345/webrtc/certhash/uFiCH_tkkzpAwkoIDbE4I7QtQksFMYs5nQ4MyYrkgCJYi4A"
.parse()
.unwrap();
let maybe_addr = parse_webrtc_listen_addr(&addr);
assert!(maybe_addr.is_none())
}
#[tokio::test]
async fn close_listener() {
let id_keys = identity::Keypair::generate_ed25519();
let mut transport =
Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap());
assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
.now_or_never()
.is_none());
// Run test twice to check that there is no unexpected behaviour if `QuicTransport.listener`
// is temporarily empty.
for _ in 0..2 {
let listener = transport
.listen_on("/ip4/0.0.0.0/udp/0/webrtc".parse().unwrap())
.unwrap();
match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
TransportEvent::NewAddress {
listener_id,
listen_addr,
} => {
assert_eq!(listener_id, listener);
assert!(
matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified())
);
assert!(
matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0)
);
assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::WebRTC)));
}
e => panic!("Unexpected event: {:?}", e),
}
assert!(
transport.remove_listener(listener),
"Expect listener to exist."
);
match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await {
TransportEvent::ListenerClosed {
listener_id,
reason: Ok(()),
} => {
assert_eq!(listener_id, listener);
}
e => panic!("Unexpected event: {:?}", e),
}
// Poll once again so that the listener has the chance to return `Poll::Ready(None)` and
// be removed from the list of listeners.
assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx))
.now_or_never()
.is_none());
assert!(transport.listeners.is_empty());
}
}
}

View File

@ -0,0 +1,577 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use async_trait::async_trait;
use futures::{
channel::oneshot,
future::{BoxFuture, FutureExt, OptionFuture},
stream::FuturesUnordered,
StreamExt,
};
use stun::{
attributes::ATTR_USERNAME,
message::{is_message as is_stun_message, Message as STUNMessage},
};
use thiserror::Error;
use tokio::{io::ReadBuf, net::UdpSocket};
use webrtc::ice::udp_mux::{UDPMux, UDPMuxConn, UDPMuxConnParams, UDPMuxWriter};
use webrtc::util::{Conn, Error};
use std::{
collections::{HashMap, HashSet},
io,
io::ErrorKind,
net::SocketAddr,
sync::Arc,
task::{Context, Poll},
};
use crate::tokio::req_res_chan;
const RECEIVE_MTU: usize = 8192;
/// A previously unseen address of a remote which has sent us an ICE binding request.
#[derive(Debug)]
pub struct NewAddr {
pub addr: SocketAddr,
pub ufrag: String,
}
/// An event emitted by [`UDPMuxNewAddr`] when it's polled.
#[derive(Debug)]
pub enum UDPMuxEvent {
/// Connection error. UDP mux should be stopped.
Error(std::io::Error),
/// Got a [`NewAddr`] from the socket.
NewAddr(NewAddr),
}
/// A modified version of [`webrtc::ice::udp_mux::UDPMuxDefault`].
///
/// - It has been rewritten to work without locks and channels instead.
/// - It reports previously unseen addresses instead of ignoring them.
pub struct UDPMuxNewAddr {
udp_sock: UdpSocket,
listen_addr: SocketAddr,
/// Maps from ufrag to the underlying connection.
conns: HashMap<String, UDPMuxConn>,
/// Maps from socket address to the underlying connection.
address_map: HashMap<SocketAddr, UDPMuxConn>,
/// Set of the new addresses to avoid sending the same address multiple times.
new_addrs: HashSet<SocketAddr>,
/// `true` when UDP mux is closed.
is_closed: bool,
send_buffer: Option<(Vec<u8>, SocketAddr, oneshot::Sender<Result<usize, Error>>)>,
close_futures: FuturesUnordered<BoxFuture<'static, ()>>,
write_future: OptionFuture<BoxFuture<'static, ()>>,
close_command: req_res_chan::Receiver<(), Result<(), Error>>,
get_conn_command: req_res_chan::Receiver<String, Result<Arc<dyn Conn + Send + Sync>, Error>>,
remove_conn_command: req_res_chan::Receiver<String, ()>,
registration_command: req_res_chan::Receiver<(UDPMuxConn, SocketAddr), ()>,
send_command: req_res_chan::Receiver<(Vec<u8>, SocketAddr), Result<usize, Error>>,
udp_mux_handle: Arc<UdpMuxHandle>,
udp_mux_writer_handle: Arc<UdpMuxWriterHandle>,
}
impl UDPMuxNewAddr {
pub fn listen_on(addr: SocketAddr) -> Result<Self, io::Error> {
let std_sock = std::net::UdpSocket::bind(addr)?;
std_sock.set_nonblocking(true)?;
let tokio_socket = UdpSocket::from_std(std_sock)?;
let listen_addr = tokio_socket.local_addr()?;
let (udp_mux_handle, close_command, get_conn_command, remove_conn_command) =
UdpMuxHandle::new();
let (udp_mux_writer_handle, registration_command, send_command) = UdpMuxWriterHandle::new();
Ok(Self {
udp_sock: tokio_socket,
listen_addr,
conns: HashMap::default(),
address_map: HashMap::default(),
new_addrs: HashSet::default(),
is_closed: false,
send_buffer: None,
close_futures: FuturesUnordered::default(),
write_future: OptionFuture::default(),
close_command,
get_conn_command,
remove_conn_command,
registration_command,
send_command,
udp_mux_handle: Arc::new(udp_mux_handle),
udp_mux_writer_handle: Arc::new(udp_mux_writer_handle),
})
}
pub fn listen_addr(&self) -> SocketAddr {
self.listen_addr
}
pub fn udp_mux_handle(&self) -> Arc<UdpMuxHandle> {
self.udp_mux_handle.clone()
}
/// Create a muxed connection for a given ufrag.
fn create_muxed_conn(&self, ufrag: &str) -> Result<UDPMuxConn, Error> {
let local_addr = self.udp_sock.local_addr()?;
let params = UDPMuxConnParams {
local_addr,
key: ufrag.into(),
udp_mux: Arc::downgrade(
&(self.udp_mux_writer_handle.clone() as Arc<dyn UDPMuxWriter + Send + Sync>),
),
};
Ok(UDPMuxConn::new(params))
}
/// Returns a muxed connection if the `ufrag` from the given STUN message matches an existing
/// connection.
fn conn_from_stun_message(
&self,
buffer: &[u8],
addr: &SocketAddr,
) -> Option<Result<UDPMuxConn, ConnQueryError>> {
match ufrag_from_stun_message(buffer, true) {
Ok(ufrag) => {
if let Some(conn) = self.conns.get(&ufrag) {
let associated_addrs = conn.get_addresses();
// This basically ensures only one address is registered per ufrag.
if associated_addrs.is_empty() || associated_addrs.contains(addr) {
return Some(Ok(conn.clone()));
} else {
return Some(Err(ConnQueryError::UfragAlreadyTaken { associated_addrs }));
}
}
None
}
Err(e) => {
log::debug!("{} (addr={})", e, addr);
None
}
}
}
/// Reads from the underlying UDP socket and either reports a new address or proxies data to the
/// muxed connection.
pub fn poll(&mut self, cx: &mut Context) -> Poll<UDPMuxEvent> {
let mut recv_buf = [0u8; RECEIVE_MTU];
loop {
// => Send data to target
match self.send_buffer.take() {
None => {
if let Poll::Ready(Some(((buf, target), response))) =
self.send_command.poll_next_unpin(cx)
{
self.send_buffer = Some((buf, target, response));
continue;
}
}
Some((buf, target, response)) => {
match self.udp_sock.poll_send_to(cx, &buf, target) {
Poll::Ready(result) => {
let _ = response.send(result.map_err(|e| Error::Io(e.into())));
continue;
}
Poll::Pending => {
self.send_buffer = Some((buf, target, response));
}
}
}
}
// => Register a new connection
if let Poll::Ready(Some(((conn, addr), response))) =
self.registration_command.poll_next_unpin(cx)
{
let key = conn.key();
self.address_map
.entry(addr)
.and_modify(|e| {
if e.key() != key {
e.remove_address(&addr);
*e = conn.clone();
}
})
.or_insert_with(|| conn.clone());
// remove addr from new_addrs once conn is established
self.new_addrs.remove(&addr);
let _ = response.send(());
continue;
}
// => Get connection with the given ufrag
if let Poll::Ready(Some((ufrag, response))) = self.get_conn_command.poll_next_unpin(cx)
{
if self.is_closed {
let _ = response.send(Err(Error::ErrUseClosedNetworkConn));
continue;
}
if let Some(conn) = self.conns.get(&ufrag).cloned() {
let _ = response.send(Ok(Arc::new(conn)));
continue;
}
let muxed_conn = match self.create_muxed_conn(&ufrag) {
Ok(conn) => conn,
Err(e) => {
let _ = response.send(Err(e));
continue;
}
};
let mut close_rx = muxed_conn.close_rx();
self.close_futures.push({
let ufrag = ufrag.clone();
let udp_mux_handle = self.udp_mux_handle.clone();
Box::pin(async move {
let _ = close_rx.changed().await;
udp_mux_handle.remove_conn_by_ufrag(&ufrag).await;
})
});
self.conns.insert(ufrag, muxed_conn.clone());
let _ = response.send(Ok(Arc::new(muxed_conn) as Arc<dyn Conn + Send + Sync>));
continue;
}
// => Close UDPMux
if let Poll::Ready(Some(((), response))) = self.close_command.poll_next_unpin(cx) {
if self.is_closed {
let _ = response.send(Err(Error::ErrAlreadyClosed));
continue;
}
for (_, conn) in self.conns.drain() {
conn.close();
}
// NOTE: This is important, we need to drop all instances of `UDPMuxConn` to
// avoid a retain cycle due to the use of [`std::sync::Arc`] on both sides.
self.address_map.clear();
// NOTE: This is important, we need to drop all instances of `UDPMuxConn` to
// avoid a retain cycle due to the use of [`std::sync::Arc`] on both sides.
self.new_addrs.clear();
let _ = response.send(Ok(()));
self.is_closed = true;
continue;
}
// => Remove connection with the given ufrag
if let Poll::Ready(Some((ufrag, response))) =
self.remove_conn_command.poll_next_unpin(cx)
{
// Pion's ice implementation has both `RemoveConnByFrag` and `RemoveConn`, but since `conns`
// is keyed on `ufrag` their implementation is equivalent.
if let Some(removed_conn) = self.conns.remove(&ufrag) {
for address in removed_conn.get_addresses() {
self.address_map.remove(&address);
}
}
let _ = response.send(());
continue;
}
// => Remove closed connections
let _ = self.close_futures.poll_next_unpin(cx);
// => Write previously received data to local connections
match self.write_future.poll_unpin(cx) {
Poll::Ready(Some(())) => {
self.write_future = OptionFuture::default();
continue;
}
Poll::Ready(None) => {
// => Read from the socket
let mut read = ReadBuf::new(&mut recv_buf);
match self.udp_sock.poll_recv_from(cx, &mut read) {
Poll::Ready(Ok(addr)) => {
// Find connection based on previously having seen this source address
let conn = self.address_map.get(&addr);
let conn = match conn {
// If we couldn't find the connection based on source address, see if
// this is a STUN mesage and if so if we can find the connection based on ufrag.
None if is_stun_message(read.filled()) => {
match self.conn_from_stun_message(read.filled(), &addr) {
Some(Ok(s)) => Some(s),
Some(Err(e)) => {
log::debug!("addr={}: Error when querying existing connections: {}", &addr, e);
continue;
}
None => None,
}
}
Some(s) => Some(s.to_owned()),
_ => None,
};
match conn {
None => {
if !self.new_addrs.contains(&addr) {
match ufrag_from_stun_message(read.filled(), false) {
Ok(ufrag) => {
log::trace!(
"Notifying about new address addr={} from ufrag={}",
&addr,
ufrag
);
self.new_addrs.insert(addr);
return Poll::Ready(UDPMuxEvent::NewAddr(
NewAddr { addr, ufrag },
));
}
Err(e) => {
log::debug!(
"Unknown address addr={} (non STUN packet: {})",
&addr,
e
);
}
}
}
}
Some(conn) => {
let mut packet = vec![0u8; read.filled().len()];
packet.copy_from_slice(read.filled());
self.write_future = OptionFuture::from(Some(
async move {
if let Err(err) = conn.write_packet(&packet, addr).await
{
log::error!(
"Failed to write packet: {} (addr={})",
err,
addr
);
}
}
.boxed(),
));
}
}
continue;
}
Poll::Ready(Err(err)) if err.kind() == ErrorKind::TimedOut => {}
Poll::Pending => {}
Poll::Ready(Err(err)) => {
log::error!("Could not read udp packet: {}", err);
return Poll::Ready(UDPMuxEvent::Error(err));
}
}
}
Poll::Pending => {}
}
return Poll::Pending;
}
}
}
/// Handle which utilizes [`req_res_chan`] to transmit commands (e.g. remove connection) from the
/// WebRTC ICE agent to [`UDPMuxNewAddr::poll`].
pub struct UdpMuxHandle {
close_sender: req_res_chan::Sender<(), Result<(), Error>>,
get_conn_sender: req_res_chan::Sender<String, Result<Arc<dyn Conn + Send + Sync>, Error>>,
remove_sender: req_res_chan::Sender<String, ()>,
}
impl UdpMuxHandle {
/// Returns a new `UdpMuxHandle` and `close`, `get_conn` and `remove` receivers.
pub fn new() -> (
Self,
req_res_chan::Receiver<(), Result<(), Error>>,
req_res_chan::Receiver<String, Result<Arc<dyn Conn + Send + Sync>, Error>>,
req_res_chan::Receiver<String, ()>,
) {
let (sender1, receiver1) = req_res_chan::new(1);
let (sender2, receiver2) = req_res_chan::new(1);
let (sender3, receiver3) = req_res_chan::new(1);
let this = Self {
close_sender: sender1,
get_conn_sender: sender2,
remove_sender: sender3,
};
(this, receiver1, receiver2, receiver3)
}
}
#[async_trait]
impl UDPMux for UdpMuxHandle {
async fn close(&self) -> Result<(), Error> {
self.close_sender
.send(())
.await
.map_err(|e| Error::Io(e.into()))??;
Ok(())
}
async fn get_conn(self: Arc<Self>, ufrag: &str) -> Result<Arc<dyn Conn + Send + Sync>, Error> {
let conn = self
.get_conn_sender
.send(ufrag.to_owned())
.await
.map_err(|e| Error::Io(e.into()))??;
Ok(conn)
}
async fn remove_conn_by_ufrag(&self, ufrag: &str) {
if let Err(e) = self.remove_sender.send(ufrag.to_owned()).await {
log::debug!("Failed to send message through channel: {:?}", e);
}
}
}
/// Handle which utilizes [`req_res_chan`] to transmit commands from [`UDPMuxConn`] connections to
/// [`UDPMuxNewAddr::poll`].
pub struct UdpMuxWriterHandle {
registration_channel: req_res_chan::Sender<(UDPMuxConn, SocketAddr), ()>,
send_channel: req_res_chan::Sender<(Vec<u8>, SocketAddr), Result<usize, Error>>,
}
impl UdpMuxWriterHandle {
/// Returns a new `UdpMuxWriterHandle` and `registration`, `send` receivers.
fn new() -> (
Self,
req_res_chan::Receiver<(UDPMuxConn, SocketAddr), ()>,
req_res_chan::Receiver<(Vec<u8>, SocketAddr), Result<usize, Error>>,
) {
let (sender1, receiver1) = req_res_chan::new(1);
let (sender2, receiver2) = req_res_chan::new(1);
let this = Self {
registration_channel: sender1,
send_channel: sender2,
};
(this, receiver1, receiver2)
}
}
#[async_trait]
impl UDPMuxWriter for UdpMuxWriterHandle {
async fn register_conn_for_address(&self, conn: &UDPMuxConn, addr: SocketAddr) {
match self
.registration_channel
.send((conn.to_owned(), addr))
.await
{
Ok(()) => {}
Err(e) => {
log::debug!("Failed to send message through channel: {:?}", e);
return;
}
}
log::debug!("Registered {} for {}", addr, conn.key());
}
async fn send_to(&self, buf: &[u8], target: &SocketAddr) -> Result<usize, Error> {
let bytes_written = self
.send_channel
.send((buf.to_owned(), target.to_owned()))
.await
.map_err(|e| Error::Io(e.into()))??;
Ok(bytes_written)
}
}
/// Gets the ufrag from the given STUN message or returns an error, if failed to decode or the
/// username attribute is not present.
fn ufrag_from_stun_message(buffer: &[u8], local_ufrag: bool) -> Result<String, Error> {
let (result, message) = {
let mut m = STUNMessage::new();
(m.unmarshal_binary(buffer), m)
};
if let Err(err) = result {
Err(Error::Other(format!(
"failed to handle decode ICE: {}",
err
)))
} else {
let (attr, found) = message.attributes.get(ATTR_USERNAME);
if !found {
return Err(Error::Other("no username attribute in STUN message".into()));
}
match String::from_utf8(attr.value) {
// Per the RFC this shouldn't happen
// https://datatracker.ietf.org/doc/html/rfc5389#section-15.3
Err(err) => Err(Error::Other(format!(
"failed to decode USERNAME from STUN message as UTF-8: {}",
err
))),
Ok(s) => {
// s is a combination of the local_ufrag and the remote ufrag separated by `:`.
let res = if local_ufrag {
s.split(':').next()
} else {
s.split(':').last()
};
match res {
Some(s) => Ok(s.to_owned()),
None => Err(Error::Other("can't get ufrag from username".into())),
}
}
}
}
}
#[derive(Error, Debug)]
enum ConnQueryError {
#[error("ufrag is already taken (associated_addrs={associated_addrs:?})")]
UfragAlreadyTaken { associated_addrs: Vec<SocketAddr> },
}

View File

@ -0,0 +1,240 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod noise;
use futures::channel::oneshot;
use futures::future::Either;
use futures_timer::Delay;
use libp2p_core::{identity, PeerId};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use webrtc::api::setting_engine::SettingEngine;
use webrtc::api::APIBuilder;
use webrtc::data::data_channel::DataChannel;
use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
use webrtc::dtls_transport::dtls_role::DTLSRole;
use webrtc::ice::network_type::NetworkType;
use webrtc::ice::udp_mux::UDPMux;
use webrtc::ice::udp_network::UDPNetwork;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::RTCPeerConnection;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use crate::tokio::{error::Error, fingerprint::Fingerprint, sdp, substream::Substream, Connection};
/// Creates a new outbound WebRTC connection.
pub async fn outbound(
addr: SocketAddr,
config: RTCConfiguration,
udp_mux: Arc<dyn UDPMux + Send + Sync>,
client_fingerprint: Fingerprint,
server_fingerprint: Fingerprint,
id_keys: identity::Keypair,
) -> Result<(PeerId, Connection), Error> {
log::debug!("new outbound connection to {addr})");
let (peer_connection, ufrag) = new_outbound_connection(addr, config, udp_mux).await?;
let offer = peer_connection.create_offer(None).await?;
log::debug!("created SDP offer for outbound connection: {:?}", offer.sdp);
peer_connection.set_local_description(offer).await?;
let answer = sdp::answer(addr, &server_fingerprint, &ufrag);
log::debug!(
"calculated SDP answer for outbound connection: {:?}",
answer
);
peer_connection.set_remote_description(answer).await?; // This will start the gathering of ICE candidates.
let data_channel = create_substream_for_noise_handshake(&peer_connection).await?;
let peer_id = noise::outbound(
id_keys,
data_channel,
server_fingerprint,
client_fingerprint,
)
.await?;
Ok((peer_id, Connection::new(peer_connection).await))
}
/// Creates a new inbound WebRTC connection.
pub async fn inbound(
addr: SocketAddr,
config: RTCConfiguration,
udp_mux: Arc<dyn UDPMux + Send + Sync>,
server_fingerprint: Fingerprint,
remote_ufrag: String,
id_keys: identity::Keypair,
) -> Result<(PeerId, Connection), Error> {
log::debug!("new inbound connection from {addr} (ufrag: {remote_ufrag})");
let peer_connection = new_inbound_connection(addr, config, udp_mux, &remote_ufrag).await?;
let offer = sdp::offer(addr, &remote_ufrag);
log::debug!("calculated SDP offer for inbound connection: {:?}", offer);
peer_connection.set_remote_description(offer).await?;
let answer = peer_connection.create_answer(None).await?;
log::debug!("created SDP answer for inbound connection: {:?}", answer);
peer_connection.set_local_description(answer).await?; // This will start the gathering of ICE candidates.
let data_channel = create_substream_for_noise_handshake(&peer_connection).await?;
let client_fingerprint = get_remote_fingerprint(&peer_connection).await;
let peer_id = noise::inbound(
id_keys,
data_channel,
client_fingerprint,
server_fingerprint,
)
.await?;
Ok((peer_id, Connection::new(peer_connection).await))
}
async fn new_outbound_connection(
addr: SocketAddr,
config: RTCConfiguration,
udp_mux: Arc<dyn UDPMux + Send + Sync>,
) -> Result<(RTCPeerConnection, String), Error> {
let ufrag = random_ufrag();
let se = setting_engine(udp_mux, &ufrag, addr);
let connection = APIBuilder::new()
.with_setting_engine(se)
.build()
.new_peer_connection(config)
.await?;
Ok((connection, ufrag))
}
async fn new_inbound_connection(
addr: SocketAddr,
config: RTCConfiguration,
udp_mux: Arc<dyn UDPMux + Send + Sync>,
ufrag: &str,
) -> Result<RTCPeerConnection, Error> {
let mut se = setting_engine(udp_mux, ufrag, addr);
{
se.set_lite(true);
se.disable_certificate_fingerprint_verification(true);
// Act as a DTLS server (one which waits for a connection).
//
// NOTE: removing this seems to break DTLS setup (both sides send `ClientHello` messages,
// but none end up responding).
se.set_answering_dtls_role(DTLSRole::Server)?;
}
let connection = APIBuilder::new()
.with_setting_engine(se)
.build()
.new_peer_connection(config)
.await?;
Ok(connection)
}
/// Generates a random ufrag and adds a prefix according to the spec.
fn random_ufrag() -> String {
format!(
"libp2p+webrtc+v1/{}",
thread_rng()
.sample_iter(&Alphanumeric)
.take(64)
.map(char::from)
.collect::<String>()
)
}
fn setting_engine(
udp_mux: Arc<dyn UDPMux + Send + Sync>,
ufrag: &str,
addr: SocketAddr,
) -> SettingEngine {
let mut se = SettingEngine::default();
// Set both ICE user and password to our fingerprint because that's what the client is
// expecting..
se.set_ice_credentials(ufrag.to_owned(), ufrag.to_owned());
se.set_udp_network(UDPNetwork::Muxed(udp_mux.clone()));
// Allow detaching data channels.
se.detach_data_channels();
// Set the desired network type.
//
// NOTE: if not set, a [`webrtc_ice::agent::Agent`] might pick a wrong local candidate
// (e.g. IPv6 `[::1]` while dialing an IPv4 `10.11.12.13`).
let network_type = match addr {
SocketAddr::V4(_) => NetworkType::Udp4,
SocketAddr::V6(_) => NetworkType::Udp6,
};
se.set_network_types(vec![network_type]);
se
}
/// Returns the SHA-256 fingerprint of the remote.
async fn get_remote_fingerprint(conn: &RTCPeerConnection) -> Fingerprint {
let cert_bytes = conn.sctp().transport().get_remote_certificate().await;
Fingerprint::from_certificate(&cert_bytes)
}
async fn create_substream_for_noise_handshake(
conn: &RTCPeerConnection,
) -> Result<Substream, Error> {
// NOTE: the data channel w/ `negotiated` flag set to `true` MUST be created on both ends.
let data_channel = conn
.create_data_channel(
"",
Some(RTCDataChannelInit {
negotiated: Some(0), // 0 is reserved for the Noise substream
..RTCDataChannelInit::default()
}),
)
.await?;
let (tx, rx) = oneshot::channel::<Arc<DataChannel>>();
// Wait until the data channel is opened and detach it.
crate::tokio::connection::register_data_channel_open_handler(data_channel, tx).await;
let channel = match futures::future::select(rx, Delay::new(Duration::from_secs(10))).await {
Either::Left((Ok(channel), _)) => channel,
Either::Left((Err(_), _)) => {
return Err(Error::Internal("failed to open data channel".to_owned()))
}
Either::Right(((), _)) => {
return Err(Error::Internal(
"data channel opening took longer than 10 seconds (see logs)".into(),
))
}
};
let (substream, drop_listener) = Substream::new(channel);
drop(drop_listener); // Don't care about cancelled substreams during initial handshake.
Ok(substream)
}

View File

@ -0,0 +1,113 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use libp2p_core::{identity, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
use libp2p_noise::{Keypair, NoiseConfig, X25519Spec};
use crate::tokio::fingerprint::Fingerprint;
use crate::tokio::Error;
pub async fn inbound<T>(
id_keys: identity::Keypair,
stream: T,
client_fingerprint: Fingerprint,
server_fingerprint: Fingerprint,
) -> Result<PeerId, Error>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let dh_keys = Keypair::<X25519Spec>::new()
.into_authentic(&id_keys)
.unwrap();
let noise = NoiseConfig::xx(dh_keys)
.with_prologue(noise_prologue(client_fingerprint, server_fingerprint));
let info = noise.protocol_info().next().unwrap();
// Note the roles are reversed because it allows the server (webrtc connection responder) to
// send application data 0.5 RTT earlier.
let (peer_id, mut channel) = noise
.into_authenticated()
.upgrade_outbound(stream, info)
.await?;
channel.close().await?;
Ok(peer_id)
}
pub async fn outbound<T>(
id_keys: identity::Keypair,
stream: T,
server_fingerprint: Fingerprint,
client_fingerprint: Fingerprint,
) -> Result<PeerId, Error>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let dh_keys = Keypair::<X25519Spec>::new()
.into_authentic(&id_keys)
.unwrap();
let noise = NoiseConfig::xx(dh_keys)
.with_prologue(noise_prologue(client_fingerprint, server_fingerprint));
let info = noise.protocol_info().next().unwrap();
// Note the roles are reversed because it allows the server (webrtc connection responder) to
// send application data 0.5 RTT earlier.
let (peer_id, mut channel) = noise
.into_authenticated()
.upgrade_inbound(stream, info)
.await?;
channel.close().await?;
Ok(peer_id)
}
pub fn noise_prologue(client_fingerprint: Fingerprint, server_fingerprint: Fingerprint) -> Vec<u8> {
let client = client_fingerprint.to_multihash().to_bytes();
let server = server_fingerprint.to_multihash().to_bytes();
const PREFIX: &[u8] = b"libp2p-webrtc-noise:";
let mut out = Vec::with_capacity(PREFIX.len() + client.len() + server.len());
out.extend_from_slice(PREFIX);
out.extend_from_slice(&client);
out.extend_from_slice(&server);
out
}
#[cfg(test)]
mod tests {
use super::*;
use hex_literal::hex;
#[test]
fn noise_prologue_tests() {
let a = Fingerprint::raw(hex!(
"3e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870"
));
let b = Fingerprint::raw(hex!(
"30fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99"
));
let prologue1 = noise_prologue(a, b);
let prologue2 = noise_prologue(b, a);
assert_eq!(hex::encode(&prologue1), "6c69627032702d7765627274632d6e6f6973653a12203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99");
assert_eq!(hex::encode(&prologue2), "6c69627032702d7765627274632d6e6f6973653a122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b9912203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870");
}
}

View File

@ -0,0 +1,486 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use anyhow::Result;
use async_trait::async_trait;
use futures::{
future::{select, Either, FutureExt},
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
stream::StreamExt,
};
use libp2p::core::{identity, muxing::StreamMuxerBox, upgrade, Transport as _};
use libp2p::request_response::{
ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig,
RequestResponseEvent, RequestResponseMessage,
};
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::webrtc::tokio as webrtc;
use rand::{thread_rng, RngCore};
use std::{io, iter};
#[tokio::test]
async fn smoke() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let mut rng = rand::thread_rng();
let mut a = create_swarm()?;
let mut b = create_swarm()?;
Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?;
Swarm::listen_on(&mut b, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?;
let addr = match a.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};
// skip other interface addresses
while a.next().now_or_never().is_some() {}
let _ = match b.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};
// skip other interface addresses
while b.next().now_or_never().is_some() {}
let mut data = vec![0; 4096];
rng.fill_bytes(&mut data);
b.behaviour_mut()
.add_address(Swarm::local_peer_id(&a), addr);
b.behaviour_mut()
.send_request(Swarm::local_peer_id(&a), Ping(data.clone()));
match b.next().await {
Some(SwarmEvent::Dialing(_)) => {}
e => panic!("{:?}", e),
}
let pair = select(a.next(), b.next());
match pair.await {
Either::Left((Some(SwarmEvent::IncomingConnection { .. }), _)) => {}
Either::Left((e, _)) => panic!("{:?}", e),
Either::Right(_) => panic!("b completed first"),
}
let pair = select(a.next(), b.next());
match pair.await {
Either::Left((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {}
Either::Left((e, _)) => panic!("{:?}", e),
Either::Right((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {}
Either::Right((e, _)) => panic!("{:?}", e),
}
let pair = select(a.next(), b.next());
match pair.await {
Either::Left((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {}
Either::Left((e, _)) => panic!("{:?}", e),
Either::Right((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {}
Either::Right((e, _)) => panic!("{:?}", e),
}
assert!(b.next().now_or_never().is_none());
let pair = select(a.next(), b.next());
match pair.await {
Either::Left((
Some(SwarmEvent::Behaviour(RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request: Ping(ping),
channel,
..
},
..
})),
_,
)) => {
a.behaviour_mut()
.send_response(channel, Pong(ping))
.unwrap();
}
Either::Left((e, _)) => panic!("{:?}", e),
Either::Right(_) => panic!("b completed first"),
}
match a.next().await {
Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {}
e => panic!("{:?}", e),
}
let pair = select(a.next(), b.next());
match pair.await {
Either::Right((
Some(SwarmEvent::Behaviour(RequestResponseEvent::Message {
message:
RequestResponseMessage::Response {
response: Pong(pong),
..
},
..
})),
_,
)) => assert_eq!(data, pong),
Either::Right((e, _)) => panic!("{:?}", e),
Either::Left(_) => panic!("a completed first"),
}
a.behaviour_mut().send_request(
Swarm::local_peer_id(&b),
Ping(b"another substream".to_vec()),
);
assert!(a.next().now_or_never().is_none());
let pair = select(a.next(), b.next());
match pair.await {
Either::Right((
Some(SwarmEvent::Behaviour(RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request: Ping(data),
channel,
..
},
..
})),
_,
)) => {
b.behaviour_mut()
.send_response(channel, Pong(data))
.unwrap();
}
Either::Right((e, _)) => panic!("{:?}", e),
Either::Left(_) => panic!("a completed first"),
}
match b.next().await {
Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {}
e => panic!("{:?}", e),
}
let pair = select(a.next(), b.next());
match pair.await {
Either::Left((
Some(SwarmEvent::Behaviour(RequestResponseEvent::Message {
message:
RequestResponseMessage::Response {
response: Pong(data),
..
},
..
})),
_,
)) => assert_eq!(data, b"another substream".to_vec()),
Either::Left((e, _)) => panic!("{:?}", e),
Either::Right(_) => panic!("b completed first"),
}
Ok(())
}
#[tokio::test]
async fn dial_failure() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let mut a = create_swarm()?;
let mut b = create_swarm()?;
Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?;
Swarm::listen_on(&mut b, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?;
let addr = match a.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};
// skip other interface addresses
while a.next().now_or_never().is_some() {}
let _ = match b.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};
// skip other interface addresses
while b.next().now_or_never().is_some() {}
let a_peer_id = &Swarm::local_peer_id(&a).clone();
drop(a); // stop a swarm so b can never reach it
b.behaviour_mut().add_address(a_peer_id, addr);
b.behaviour_mut()
.send_request(a_peer_id, Ping(b"hello world".to_vec()));
match b.next().await {
Some(SwarmEvent::Dialing(_)) => {}
e => panic!("{:?}", e),
}
match b.next().await {
Some(SwarmEvent::OutgoingConnectionError { .. }) => {}
e => panic!("{:?}", e),
};
match b.next().await {
Some(SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { .. })) => {}
e => panic!("{:?}", e),
};
Ok(())
}
#[tokio::test]
async fn concurrent_connections_and_streams() {
let _ = env_logger::builder().is_test(true).try_init();
let num_listeners = 3usize;
let num_streams = 8usize;
let mut data = vec![0; 4096];
rand::thread_rng().fill_bytes(&mut data);
let mut listeners = vec![];
// Spawn the listener nodes.
for _ in 0..num_listeners {
let mut listener = create_swarm().unwrap();
Swarm::listen_on(
&mut listener,
"/ip4/127.0.0.1/udp/0/webrtc".parse().unwrap(),
)
.unwrap();
// Wait to listen on address.
let addr = match listener.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};
listeners.push((*listener.local_peer_id(), addr));
tokio::spawn(async move {
loop {
match listener.next().await {
Some(SwarmEvent::IncomingConnection { .. }) => {
log::debug!("listener IncomingConnection");
}
Some(SwarmEvent::ConnectionEstablished { .. }) => {
log::debug!("listener ConnectionEstablished");
}
Some(SwarmEvent::Behaviour(RequestResponseEvent::Message {
message:
RequestResponseMessage::Request {
request: Ping(ping),
channel,
..
},
..
})) => {
log::debug!("listener got Message");
listener
.behaviour_mut()
.send_response(channel, Pong(ping))
.unwrap();
}
Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {
log::debug!("listener ResponseSent");
}
Some(SwarmEvent::ConnectionClosed { .. }) => {}
Some(SwarmEvent::NewListenAddr { .. }) => {
log::debug!("listener NewListenAddr");
}
Some(e) => {
panic!("unexpected event {:?}", e);
}
None => {
panic!("listener stopped");
}
}
}
});
}
let mut dialer = create_swarm().unwrap();
Swarm::listen_on(&mut dialer, "/ip4/127.0.0.1/udp/0/webrtc".parse().unwrap()).unwrap();
// Wait to listen on address.
match dialer.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};
// For each listener node start `number_streams` requests.
for (listener_peer_id, listener_addr) in &listeners {
dialer
.behaviour_mut()
.add_address(listener_peer_id, listener_addr.clone());
dialer.dial(*listener_peer_id).unwrap();
}
// Wait for responses to each request.
let mut num_responses = 0;
loop {
match dialer.next().await {
Some(SwarmEvent::Dialing(_)) => {
log::debug!("dialer Dialing");
}
Some(SwarmEvent::ConnectionEstablished { peer_id, .. }) => {
log::debug!("dialer Connection established");
for _ in 0..num_streams {
dialer
.behaviour_mut()
.send_request(&peer_id, Ping(data.clone()));
}
}
Some(SwarmEvent::Behaviour(RequestResponseEvent::Message {
message:
RequestResponseMessage::Response {
response: Pong(pong),
..
},
..
})) => {
log::debug!("dialer got Message");
num_responses += 1;
assert_eq!(data, pong);
let should_be = num_listeners * num_streams;
log::debug!(
"num of responses: {}, num of listeners * num of streams: {}",
num_responses,
should_be
);
if num_responses == should_be {
break;
}
}
Some(SwarmEvent::ConnectionClosed { .. }) => {
log::debug!("dialer ConnectionClosed");
}
Some(SwarmEvent::NewListenAddr { .. }) => {
log::debug!("dialer NewListenAddr");
}
e => {
panic!("unexpected event {:?}", e);
}
}
}
}
#[derive(Debug, Clone)]
struct PingProtocol();
#[derive(Clone)]
struct PingCodec();
#[derive(Debug, Clone, PartialEq, Eq)]
struct Ping(Vec<u8>);
#[derive(Debug, Clone, PartialEq, Eq)]
struct Pong(Vec<u8>);
impl ProtocolName for PingProtocol {
fn protocol_name(&self) -> &[u8] {
"/ping/1".as_bytes()
}
}
#[async_trait]
impl RequestResponseCodec for PingCodec {
type Protocol = PingProtocol;
type Request = Ping;
type Response = Pong;
async fn read_request<T>(&mut self, _: &PingProtocol, io: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
upgrade::read_length_prefixed(io, 4096)
.map(|res| match res {
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()),
Ok(vec) => Ok(Ping(vec)),
})
.await
}
async fn read_response<T>(&mut self, _: &PingProtocol, io: &mut T) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
upgrade::read_length_prefixed(io, 4096)
.map(|res| match res {
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()),
Ok(vec) => Ok(Pong(vec)),
})
.await
}
async fn write_request<T>(
&mut self,
_: &PingProtocol,
io: &mut T,
Ping(data): Ping,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
upgrade::write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &PingProtocol,
io: &mut T,
Pong(data): Pong,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
upgrade::write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
}
fn create_swarm() -> Result<Swarm<RequestResponse<PingCodec>>> {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().to_peer_id();
let transport = webrtc::Transport::new(
id_keys,
webrtc::Certificate::generate(&mut thread_rng()).unwrap(),
);
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
let cfg = RequestResponseConfig::default();
let behaviour = RequestResponse::new(PingCodec(), protocols, cfg);
let transport = transport
.map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)))
.boxed();
Ok(Swarm::with_tokio_executor(transport, behaviour, peer_id))
}