feat(webrtc): add WebRTC for WASM environments

This PR implements `Transport` for WebRTC for browsers by using web-sys. Only the `webrtc-direct` spec is implemented. The `webrtc` spec for connecting two browsers with each other is left to a future PR.

Related: https://github.com/libp2p/specs/issues/475.
Related #2617.
Supersedes: #4229.

Pull-Request: #4248.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
Doug A
2023-09-17 16:13:11 -03:00
committed by GitHub
parent 508cad1f0d
commit f5e644da8f
49 changed files with 2192 additions and 412 deletions

View File

@ -0,0 +1,6 @@
## 0.1.0 - unreleased
- Initial release.
See [PR 4248].
[PR 4248]: https://github.com/libp2p/rust-libp2p/pull/4248

View File

@ -0,0 +1,32 @@
[package]
authors = ["Doug Anderson <DougAnderson444@peerpiper.io>"]
categories = ["network-programming"]
description = "Utilities for WebRTC in libp2p"
edition = "2021"
license = "MIT"
name = "libp2p-webrtc-utils"
repository = "https://github.com/libp2p/rust-libp2p"
rust-version = { workspace = true }
version = "0.1.0"
publish = false # TEMP fix for https://github.com/obi1kenobi/cargo-semver-checks-action/issues/53.
[dependencies]
bytes = "1"
futures = "0.3"
hex = "0.4"
libp2p-core = { workspace = true }
libp2p-identity = { workspace = true }
libp2p-noise = { workspace = true }
log = "0.4.19"
quick-protobuf = "0.8"
quick-protobuf-codec = { workspace = true }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
sha2 = "0.10.7"
thiserror = "1"
tinytemplate = "1.2"
asynchronous-codec = "0.6"
[dev-dependencies]
hex-literal = "0.4"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }

View File

@ -0,0 +1,109 @@
// Copyright 2023 Doug Anderson.
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::multihash;
use sha2::Digest as _;
use std::fmt;
pub const SHA256: &str = "sha-256";
const MULTIHASH_SHA256_CODE: u64 = 0x12;
type Multihash = multihash::Multihash<64>;
/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm.
#[derive(Eq, PartialEq, Copy, Clone)]
pub struct Fingerprint([u8; 32]);
impl Fingerprint {
pub const FF: Fingerprint = Fingerprint([0xFF; 32]);
pub const fn raw(digest: [u8; 32]) -> Self {
Fingerprint(digest)
}
/// Creates a new [Fingerprint] from a raw certificate by hashing the given bytes with SHA256.
pub fn from_certificate(bytes: &[u8]) -> Self {
Fingerprint(sha2::Sha256::digest(bytes).into())
}
/// Converts [`Multihash`](multihash::Multihash) to [`Fingerprint`].
pub fn try_from_multihash(hash: Multihash) -> Option<Self> {
if hash.code() != MULTIHASH_SHA256_CODE {
// Only support SHA256 for now.
return None;
}
let bytes = hash.digest().try_into().ok()?;
Some(Self(bytes))
}
/// Converts this fingerprint to [`Multihash`](multihash::Multihash).
pub fn to_multihash(self) -> Multihash {
Multihash::wrap(MULTIHASH_SHA256_CODE, &self.0).expect("fingerprint's len to be 32 bytes")
}
/// Formats this fingerprint as uppercase hex, separated by colons (`:`).
///
/// This is the format described in <https://www.rfc-editor.org/rfc/rfc4572#section-5>.
pub fn to_sdp_format(self) -> String {
self.0.map(|byte| format!("{byte:02X}")).join(":")
}
/// Returns the algorithm used (e.g. "sha-256").
/// See <https://datatracker.ietf.org/doc/html/rfc8122#section-5>
pub fn algorithm(&self) -> String {
SHA256.to_owned()
}
}
impl fmt::Debug for Fingerprint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&hex::encode(self.0))
}
}
#[cfg(test)]
mod tests {
use super::*;
const SDP_FORMAT: &str = "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC";
const REGULAR_FORMAT: [u8; 32] =
hex_literal::hex!("7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC");
#[test]
fn sdp_format() {
let fp = Fingerprint::raw(REGULAR_FORMAT);
let formatted = fp.to_sdp_format();
assert_eq!(formatted, SDP_FORMAT)
}
#[test]
fn from_sdp() {
let mut bytes = [0; 32];
bytes.copy_from_slice(&hex::decode(SDP_FORMAT.replace(':', "")).unwrap());
let fp = Fingerprint::raw(bytes);
assert_eq!(fp, Fingerprint::raw(REGULAR_FORMAT));
}
}

View File

@ -0,0 +1,20 @@
syntax = "proto2";
package webrtc.pb;
message Message {
enum Flag {
// The sender will no longer send messages on the stream.
FIN = 0;
// The sender will no longer read messages on the stream. Incoming data is
// being discarded on receipt.
STOP_SENDING = 1;
// The sender abruptly terminates the sending part of the stream. The
// receiver can discard any data that it already received on that stream.
RESET = 2;
}
optional Flag flag=1;
optional bytes message = 2;
}

View File

@ -0,0 +1,2 @@
// Automatically generated mod.rs
pub mod webrtc;

View File

@ -0,0 +1,2 @@
// Automatically generated mod.rs
pub mod pb;

View File

@ -0,0 +1,91 @@
// Automatically generated rust module for 'message.proto' file
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(unused_imports)]
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Message {
pub flag: Option<webrtc::pb::mod_Message::Flag>,
pub message: Option<Vec<u8>>,
}
impl<'a> MessageRead<'a> for Message {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(8) => msg.flag = Some(r.read_enum(bytes)?),
Ok(18) => msg.message = Some(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for Message {
fn get_size(&self) -> usize {
0
+ self.flag.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64))
+ self.message.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.flag { w.write_with_tag(8, |w| w.write_enum(*s as i32))?; }
if let Some(ref s) = self.message { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
pub mod mod_Message {
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Flag {
FIN = 0,
STOP_SENDING = 1,
RESET = 2,
}
impl Default for Flag {
fn default() -> Self {
Flag::FIN
}
}
impl From<i32> for Flag {
fn from(i: i32) -> Self {
match i {
0 => Flag::FIN,
1 => Flag::STOP_SENDING,
2 => Flag::RESET,
_ => Self::default(),
}
}
}
impl<'a> From<&'a str> for Flag {
fn from(s: &'a str) -> Self {
match s {
"FIN" => Flag::FIN,
"STOP_SENDING" => Flag::STOP_SENDING,
"RESET" => Flag::RESET,
_ => Self::default(),
}
}
}
}

View File

@ -0,0 +1,15 @@
mod proto {
#![allow(unreachable_pub)]
include!("generated/mod.rs");
pub use self::webrtc::pb::{mod_Message::Flag, Message};
}
mod fingerprint;
pub mod noise;
pub mod sdp;
mod stream;
mod transport;
pub use fingerprint::{Fingerprint, SHA256};
pub use stream::{DropListener, Stream, MAX_MSG_LEN};
pub use transport::parse_webrtc_dial_addr;

View File

@ -0,0 +1,107 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_identity as identity;
use libp2p_identity::PeerId;
use libp2p_noise as noise;
use crate::fingerprint::Fingerprint;
pub async fn inbound<T>(
id_keys: identity::Keypair,
stream: T,
client_fingerprint: Fingerprint,
server_fingerprint: Fingerprint,
) -> Result<PeerId, libp2p_noise::Error>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let noise = noise::Config::new(&id_keys)
.unwrap()
.with_prologue(noise_prologue(client_fingerprint, server_fingerprint));
let info = noise.protocol_info().next().unwrap();
// Note the roles are reversed because it allows the server (webrtc connection responder) to
// send application data 0.5 RTT earlier.
let (peer_id, mut channel) = noise.upgrade_outbound(stream, info).await?;
channel.close().await?;
Ok(peer_id)
}
pub async fn outbound<T>(
id_keys: identity::Keypair,
stream: T,
server_fingerprint: Fingerprint,
client_fingerprint: Fingerprint,
) -> Result<PeerId, libp2p_noise::Error>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let noise = noise::Config::new(&id_keys)
.unwrap()
.with_prologue(noise_prologue(client_fingerprint, server_fingerprint));
let info = noise.protocol_info().next().unwrap();
// Note the roles are reversed because it allows the server (webrtc connection responder) to
// send application data 0.5 RTT earlier.
let (peer_id, mut channel) = noise.upgrade_inbound(stream, info).await?;
channel.close().await?;
Ok(peer_id)
}
pub(crate) fn noise_prologue(
client_fingerprint: Fingerprint,
server_fingerprint: Fingerprint,
) -> Vec<u8> {
let client = client_fingerprint.to_multihash().to_bytes();
let server = server_fingerprint.to_multihash().to_bytes();
const PREFIX: &[u8] = b"libp2p-webrtc-noise:";
let mut out = Vec::with_capacity(PREFIX.len() + client.len() + server.len());
out.extend_from_slice(PREFIX);
out.extend_from_slice(&client);
out.extend_from_slice(&server);
out
}
#[cfg(test)]
mod tests {
use super::*;
use hex_literal::hex;
#[test]
fn noise_prologue_tests() {
let a = Fingerprint::raw(hex!(
"3e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870"
));
let b = Fingerprint::raw(hex!(
"30fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99"
));
let prologue1 = noise_prologue(a, b);
let prologue2 = noise_prologue(b, a);
assert_eq!(hex::encode(prologue1), "6c69627032702d7765627274632d6e6f6973653a12203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99");
assert_eq!(hex::encode(prologue2), "6c69627032702d7765627274632d6e6f6973653a122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b9912203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870");
}
}

View File

@ -0,0 +1,157 @@
// Copyright 2023 Doug Anderson
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::fingerprint::Fingerprint;
use serde::Serialize;
use std::net::{IpAddr, SocketAddr};
use tinytemplate::TinyTemplate;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
pub fn answer(addr: SocketAddr, server_fingerprint: Fingerprint, client_ufrag: &str) -> String {
let answer = render_description(
SERVER_SESSION_DESCRIPTION,
addr,
server_fingerprint,
client_ufrag,
);
log::trace!("Created SDP answer: {answer}");
answer
}
// See [`CLIENT_SESSION_DESCRIPTION`].
//
// a=ice-lite
//
// A lite implementation is only appropriate for devices that will *always* be connected to
// the public Internet and have a public IP address at which it can receive packets from any
// correspondent. ICE will not function when a lite implementation is placed behind a NAT
// (RFC8445).
//
// a=tls-id:<id>
//
// "TLS ID" uniquely identifies a TLS association.
// The ICE protocol uses a "TLS ID" system to indicate whether a fresh DTLS connection
// must be reopened in case of ICE renegotiation. Considering that ICE renegotiations
// never happen in our use case, we can simply put a random value and not care about
// it. Note however that the TLS ID in the answer must be present if and only if the
// offer contains one. (RFC8842)
// TODO: is it true that renegotiations never happen? what about a connection closing?
// "tls-id" attribute MUST be present in the initial offer and respective answer (RFC8839).
// XXX: but right now browsers don't send it.
//
// a=setup:passive
//
// "passive" indicates that the remote DTLS server will only listen for incoming
// connections. (RFC5763)
// The answerer (server) MUST not be located behind a NAT (RFC6135).
//
// The answerer MUST use either a setup attribute value of setup:active or setup:passive.
// Note that if the answerer uses setup:passive, then the DTLS handshake will not begin until
// the answerer is received, which adds additional latency. setup:active allows the answer and
// the DTLS handshake to occur in parallel. Thus, setup:active is RECOMMENDED.
//
// a=candidate:<foundation> <component-id> <transport> <priority> <connection-address> <port> <cand-type>
//
// A transport address for a candidate that can be used for connectivity checks (RFC8839).
//
// a=end-of-candidates
const SERVER_SESSION_DESCRIPTION: &str = "v=0
o=- 0 0 IN {ip_version} {target_ip}
s=-
t=0 0
a=ice-lite
m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel
c=IN {ip_version} {target_ip}
a=mid:0
a=ice-options:ice2
a=ice-ufrag:{ufrag}
a=ice-pwd:{pwd}
a=fingerprint:{fingerprint_algorithm} {fingerprint_value}
a=setup:passive
a=sctp-port:5000
a=max-message-size:16384
a=candidate:1467250027 1 UDP 1467250027 {target_ip} {target_port} typ host
a=end-of-candidates
";
/// Indicates the IP version used in WebRTC: `IP4` or `IP6`.
#[derive(Serialize)]
enum IpVersion {
IP4,
IP6,
}
/// Context passed to the templating engine, which replaces the above placeholders (e.g.
/// `{IP_VERSION}`) with real values.
#[derive(Serialize)]
struct DescriptionContext {
pub(crate) ip_version: IpVersion,
pub(crate) target_ip: IpAddr,
pub(crate) target_port: u16,
pub(crate) fingerprint_algorithm: String,
pub(crate) fingerprint_value: String,
pub(crate) ufrag: String,
pub(crate) pwd: String,
}
/// Renders a [`TinyTemplate`] description using the provided arguments.
pub fn render_description(
description: &str,
addr: SocketAddr,
fingerprint: Fingerprint,
ufrag: &str,
) -> String {
let mut tt = TinyTemplate::new();
tt.add_template("description", description).unwrap();
let context = DescriptionContext {
ip_version: {
if addr.is_ipv4() {
IpVersion::IP4
} else {
IpVersion::IP6
}
},
target_ip: addr.ip(),
target_port: addr.port(),
fingerprint_algorithm: fingerprint.algorithm(),
fingerprint_value: fingerprint.to_sdp_format(),
// NOTE: ufrag is equal to pwd.
ufrag: ufrag.to_owned(),
pwd: ufrag.to_owned(),
};
tt.render("description", &context).unwrap()
}
/// Generates a random ufrag and adds a prefix according to the spec.
pub fn random_ufrag() -> String {
format!(
"libp2p+webrtc+v1/{}",
thread_rng()
.sample_iter(&Alphanumeric)
.take(64)
.map(char::from)
.collect::<String>()
)
}

View File

@ -0,0 +1,295 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
// Copyright 2023 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::{channel::oneshot, prelude::*, ready};
use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use crate::proto::{Flag, Message};
use crate::{
stream::drop_listener::GracefullyClosed,
stream::framed_dc::FramedDc,
stream::state::{Closing, State},
};
mod drop_listener;
mod framed_dc;
mod state;
/// Maximum length of a message.
///
/// "As long as message interleaving is not supported, the sender SHOULD limit the maximum message
/// size to 16 KB to avoid monopolization."
/// Source: <https://www.rfc-editor.org/rfc/rfc8831#name-transferring-user-data-on-a>
pub const MAX_MSG_LEN: usize = 16 * 1024;
/// Length of varint, in bytes.
const VARINT_LEN: usize = 2;
/// Overhead of the protobuf encoding, in bytes.
const PROTO_OVERHEAD: usize = 5;
/// Maximum length of data, in bytes.
const MAX_DATA_LEN: usize = MAX_MSG_LEN - VARINT_LEN - PROTO_OVERHEAD;
pub use drop_listener::DropListener;
/// A stream backed by a WebRTC data channel.
///
/// To be a proper libp2p stream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well
/// as support a half-closed state which we do by framing messages in a protobuf envelope.
pub struct Stream<T> {
io: FramedDc<T>,
state: State,
read_buffer: Bytes,
/// Dropping this will close the oneshot and notify the receiver by emitting `Canceled`.
drop_notifier: Option<oneshot::Sender<GracefullyClosed>>,
}
impl<T> Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin + Clone,
{
/// Returns a new [`Stream`] and a [`DropListener`], which will notify the receiver when/if the stream is dropped.
pub fn new(data_channel: T) -> (Self, DropListener<T>) {
let (sender, receiver) = oneshot::channel();
let stream = Self {
io: framed_dc::new(data_channel.clone()),
state: State::Open,
read_buffer: Bytes::default(),
drop_notifier: Some(sender),
};
let listener = DropListener::new(framed_dc::new(data_channel), receiver);
(stream, listener)
}
/// Gracefully closes the "read-half" of the stream.
pub fn poll_close_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.state.close_read_barrier()? {
Some(Closing::Requested) => {
ready!(self.io.poll_ready_unpin(cx))?;
self.io.start_send_unpin(Message {
flag: Some(Flag::STOP_SENDING),
message: None,
})?;
self.state.close_read_message_sent();
continue;
}
Some(Closing::MessageSent) => {
ready!(self.io.poll_flush_unpin(cx))?;
self.state.read_closed();
return Poll::Ready(Ok(()));
}
None => return Poll::Ready(Ok(())),
}
}
}
}
impl<T> AsyncRead for Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
self.state.read_barrier()?;
if !self.read_buffer.is_empty() {
let n = std::cmp::min(self.read_buffer.len(), buf.len());
let data = self.read_buffer.split_to(n);
buf[0..n].copy_from_slice(&data[..]);
return Poll::Ready(Ok(n));
}
let Self {
read_buffer,
io,
state,
..
} = &mut *self;
match ready!(io_poll_next(io, cx))? {
Some((flag, message)) => {
if let Some(flag) = flag {
state.handle_inbound_flag(flag, read_buffer);
}
debug_assert!(read_buffer.is_empty());
if let Some(message) = message {
*read_buffer = message.into();
}
}
None => {
state.handle_inbound_flag(Flag::FIN, read_buffer);
return Poll::Ready(Ok(0));
}
}
}
}
}
impl<T> AsyncWrite for Stream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
while self.state.read_flags_in_async_write() {
// TODO: In case AsyncRead::poll_read encountered an error or returned None earlier, we will poll the
// underlying I/O resource once more. Is that allowed? How about introducing a state IoReadClosed?
let Self {
read_buffer,
io,
state,
..
} = &mut *self;
match io_poll_next(io, cx)? {
Poll::Ready(Some((Some(flag), message))) => {
// Read side is closed. Discard any incoming messages.
drop(message);
// But still handle flags, e.g. a `Flag::StopSending`.
state.handle_inbound_flag(flag, read_buffer)
}
Poll::Ready(Some((None, message))) => drop(message),
Poll::Ready(None) | Poll::Pending => break,
}
}
self.state.write_barrier()?;
ready!(self.io.poll_ready_unpin(cx))?;
let n = usize::min(buf.len(), MAX_DATA_LEN);
Pin::new(&mut self.io).start_send(Message {
flag: None,
message: Some(buf[0..n].into()),
})?;
Poll::Ready(Ok(n))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.poll_flush_unpin(cx).map_err(Into::into)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.state.close_write_barrier()? {
Some(Closing::Requested) => {
ready!(self.io.poll_ready_unpin(cx))?;
self.io.start_send_unpin(Message {
flag: Some(Flag::FIN),
message: None,
})?;
self.state.close_write_message_sent();
continue;
}
Some(Closing::MessageSent) => {
ready!(self.io.poll_flush_unpin(cx))?;
self.state.write_closed();
let _ = self
.drop_notifier
.take()
.expect("to not close twice")
.send(GracefullyClosed {});
return Poll::Ready(Ok(()));
}
None => return Poll::Ready(Ok(())),
}
}
}
}
fn io_poll_next<T>(
io: &mut FramedDc<T>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<(Option<Flag>, Option<Vec<u8>>)>>>
where
T: AsyncRead + AsyncWrite + Unpin,
{
match ready!(io.poll_next_unpin(cx))
.transpose()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
{
Some(Message { flag, message }) => Poll::Ready(Ok(Some((flag, message)))),
None => Poll::Ready(Ok(None)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use asynchronous_codec::Encoder;
use bytes::BytesMut;
use quick_protobuf::{MessageWrite, Writer};
use unsigned_varint::codec::UviBytes;
#[test]
fn max_data_len() {
// Largest possible message.
let message = [0; MAX_DATA_LEN];
let protobuf = Message {
flag: Some(Flag::FIN),
message: Some(message.to_vec()),
};
let mut encoded_msg = Vec::new();
let mut writer = Writer::new(&mut encoded_msg);
protobuf
.write_message(&mut writer)
.expect("Encoding to succeed");
assert_eq!(encoded_msg.len(), message.len() + PROTO_OVERHEAD);
let mut uvi = UviBytes::default();
let mut dst = BytesMut::new();
uvi.encode(encoded_msg.as_slice(), &mut dst).unwrap();
// Ensure the varint prefixed and protobuf encoded largest message is no longer than the
// maximum limit specified in the libp2p WebRTC specification.
assert_eq!(dst.len(), MAX_MSG_LEN);
assert_eq!(dst.len() - encoded_msg.len(), VARINT_LEN);
}
}

View File

@ -0,0 +1,121 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::channel::oneshot;
use futures::channel::oneshot::Canceled;
use futures::{AsyncRead, AsyncWrite, FutureExt, SinkExt};
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::proto::{Flag, Message};
use crate::stream::framed_dc::FramedDc;
#[must_use]
pub struct DropListener<T> {
state: State<T>,
}
impl<T> DropListener<T> {
pub fn new(stream: FramedDc<T>, receiver: oneshot::Receiver<GracefullyClosed>) -> Self {
Self {
state: State::Idle { stream, receiver },
}
}
}
enum State<T> {
/// The [`DropListener`] is idle and waiting to be activated.
Idle {
stream: FramedDc<T>,
receiver: oneshot::Receiver<GracefullyClosed>,
},
/// The stream got dropped and we are sending a reset flag.
SendingReset {
stream: FramedDc<T>,
},
Flushing {
stream: FramedDc<T>,
},
/// Bad state transition.
Poisoned,
}
impl<T> Future for DropListener<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let state = &mut self.get_mut().state;
loop {
match std::mem::replace(state, State::Poisoned) {
State::Idle {
stream,
mut receiver,
} => match receiver.poll_unpin(cx) {
Poll::Ready(Ok(GracefullyClosed {})) => {
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(Canceled)) => {
log::info!("Stream dropped without graceful close, sending Reset");
*state = State::SendingReset { stream };
continue;
}
Poll::Pending => {
*state = State::Idle { stream, receiver };
return Poll::Pending;
}
},
State::SendingReset { mut stream } => match stream.poll_ready_unpin(cx)? {
Poll::Ready(()) => {
stream.start_send_unpin(Message {
flag: Some(Flag::RESET),
message: None,
})?;
*state = State::Flushing { stream };
continue;
}
Poll::Pending => {
*state = State::SendingReset { stream };
return Poll::Pending;
}
},
State::Flushing { mut stream } => match stream.poll_flush_unpin(cx)? {
Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {
*state = State::Flushing { stream };
return Poll::Pending;
}
},
State::Poisoned => {
unreachable!()
}
}
}
}
}
/// Indicates that our stream got gracefully closed.
pub struct GracefullyClosed {}

View File

@ -0,0 +1,40 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use asynchronous_codec::Framed;
use futures::{AsyncRead, AsyncWrite};
use crate::proto::Message;
use crate::stream::{MAX_DATA_LEN, MAX_MSG_LEN, VARINT_LEN};
pub(crate) type FramedDc<T> = Framed<T, quick_protobuf_codec::Codec<Message>>;
pub(crate) fn new<T>(inner: T) -> FramedDc<T>
where
T: AsyncRead + AsyncWrite,
{
let mut framed = Framed::new(
inner,
quick_protobuf_codec::Codec::new(MAX_MSG_LEN - VARINT_LEN),
);
// If not set, `Framed` buffers up to 131kB of data before sending, which leads to "outbound
// packet larger than maximum message size" error in webrtc-rs.
framed.set_send_high_water_mark(MAX_DATA_LEN);
framed
}

View File

@ -0,0 +1,508 @@
// Copyright 2022 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use std::io;
use crate::proto::Flag;
#[derive(Debug, Copy, Clone)]
pub(crate) enum State {
Open,
ReadClosed,
WriteClosed,
ClosingRead {
/// Whether the write side of our channel was already closed.
write_closed: bool,
inner: Closing,
},
ClosingWrite {
/// Whether the write side of our channel was already closed.
read_closed: bool,
inner: Closing,
},
BothClosed {
reset: bool,
},
}
/// Represents the state of closing one half (either read or write) of the connection.
///
/// Gracefully closing the read or write requires sending the `STOP_SENDING` or `FIN` flag respectively
/// and flushing the underlying connection.
#[derive(Debug, Copy, Clone)]
pub(crate) enum Closing {
Requested,
MessageSent,
}
impl State {
/// Performs a state transition for a flag contained in an inbound message.
pub(crate) fn handle_inbound_flag(&mut self, flag: Flag, buffer: &mut Bytes) {
let current = *self;
match (current, flag) {
(Self::Open, Flag::FIN) => {
*self = Self::ReadClosed;
}
(Self::WriteClosed, Flag::FIN) => {
*self = Self::BothClosed { reset: false };
}
(Self::Open, Flag::STOP_SENDING) => {
*self = Self::WriteClosed;
}
(Self::ReadClosed, Flag::STOP_SENDING) => {
*self = Self::BothClosed { reset: false };
}
(_, Flag::RESET) => {
buffer.clear();
*self = Self::BothClosed { reset: true };
}
_ => {}
}
}
pub(crate) fn write_closed(&mut self) {
match self {
State::ClosingWrite {
read_closed: true,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::BothClosed { reset: false };
}
State::ClosingWrite {
read_closed: false,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::WriteClosed;
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingRead { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
pub(crate) fn close_write_message_sent(&mut self) {
match self {
State::ClosingWrite { inner, read_closed } => {
debug_assert!(matches!(inner, Closing::Requested));
*self = State::ClosingWrite {
read_closed: *read_closed,
inner: Closing::MessageSent,
};
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingRead { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
pub(crate) fn read_closed(&mut self) {
match self {
State::ClosingRead {
write_closed: true,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::BothClosed { reset: false };
}
State::ClosingRead {
write_closed: false,
inner,
} => {
debug_assert!(matches!(inner, Closing::MessageSent));
*self = State::ReadClosed;
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingWrite { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
pub(crate) fn close_read_message_sent(&mut self) {
match self {
State::ClosingRead {
inner,
write_closed,
} => {
debug_assert!(matches!(inner, Closing::Requested));
*self = State::ClosingRead {
write_closed: *write_closed,
inner: Closing::MessageSent,
};
}
State::Open
| State::ReadClosed
| State::WriteClosed
| State::ClosingWrite { .. }
| State::BothClosed { .. } => {
unreachable!("bad state machine impl")
}
}
}
/// Whether we should read from the stream in the [`futures::AsyncWrite`] implementation.
///
/// This is necessary for read-closed streams because we would otherwise not read any more flags from
/// the socket.
pub(crate) fn read_flags_in_async_write(&self) -> bool {
matches!(self, Self::ReadClosed)
}
/// Acts as a "barrier" for [`futures::AsyncRead::poll_read`].
pub(crate) fn read_barrier(&self) -> io::Result<()> {
use State::*;
let kind = match self {
Open
| WriteClosed
| ClosingWrite {
read_closed: false, ..
} => return Ok(()),
ClosingWrite {
read_closed: true, ..
}
| ReadClosed
| ClosingRead { .. }
| BothClosed { reset: false } => io::ErrorKind::BrokenPipe,
BothClosed { reset: true } => io::ErrorKind::ConnectionReset,
};
Err(kind.into())
}
/// Acts as a "barrier" for [`futures::AsyncWrite::poll_write`].
pub(crate) fn write_barrier(&self) -> io::Result<()> {
use State::*;
let kind = match self {
Open
| ReadClosed
| ClosingRead {
write_closed: false,
..
} => return Ok(()),
ClosingRead {
write_closed: true, ..
}
| WriteClosed
| ClosingWrite { .. }
| BothClosed { reset: false } => io::ErrorKind::BrokenPipe,
BothClosed { reset: true } => io::ErrorKind::ConnectionReset,
};
Err(kind.into())
}
/// Acts as a "barrier" for [`futures::AsyncWrite::poll_close`].
pub(crate) fn close_write_barrier(&mut self) -> io::Result<Option<Closing>> {
loop {
match &self {
State::WriteClosed => return Ok(None),
State::ClosingWrite { inner, .. } => return Ok(Some(*inner)),
State::Open => {
*self = Self::ClosingWrite {
read_closed: false,
inner: Closing::Requested,
};
}
State::ReadClosed => {
*self = Self::ClosingWrite {
read_closed: true,
inner: Closing::Requested,
};
}
State::ClosingRead {
write_closed: true, ..
}
| State::BothClosed { reset: false } => {
return Err(io::ErrorKind::BrokenPipe.into())
}
State::ClosingRead {
write_closed: false,
..
} => {
return Err(io::Error::new(
io::ErrorKind::Other,
"cannot close read half while closing write half",
))
}
State::BothClosed { reset: true } => {
return Err(io::ErrorKind::ConnectionReset.into())
}
}
}
}
/// Acts as a "barrier" for [`Stream::poll_close_read`](super::Stream::poll_close_read).
pub(crate) fn close_read_barrier(&mut self) -> io::Result<Option<Closing>> {
loop {
match self {
State::ReadClosed => return Ok(None),
State::ClosingRead { inner, .. } => return Ok(Some(*inner)),
State::Open => {
*self = Self::ClosingRead {
write_closed: false,
inner: Closing::Requested,
};
}
State::WriteClosed => {
*self = Self::ClosingRead {
write_closed: true,
inner: Closing::Requested,
};
}
State::ClosingWrite {
read_closed: true, ..
}
| State::BothClosed { reset: false } => {
return Err(io::ErrorKind::BrokenPipe.into())
}
State::ClosingWrite {
read_closed: false, ..
} => {
return Err(io::Error::new(
io::ErrorKind::Other,
"cannot close write half while closing read half",
))
}
State::BothClosed { reset: true } => {
return Err(io::ErrorKind::ConnectionReset.into())
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::ErrorKind;
#[test]
fn cannot_read_after_receiving_fin() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::FIN, &mut Bytes::default());
let error = open.read_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn cannot_read_after_closing_read() {
let mut open = State::Open;
open.close_read_barrier().unwrap();
open.close_read_message_sent();
open.read_closed();
let error = open.read_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn cannot_write_after_receiving_stop_sending() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::STOP_SENDING, &mut Bytes::default());
let error = open.write_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn cannot_write_after_closing_write() {
let mut open = State::Open;
open.close_write_barrier().unwrap();
open.close_write_message_sent();
open.write_closed();
let error = open.write_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe)
}
#[test]
fn everything_broken_after_receiving_reset() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::RESET, &mut Bytes::default());
let error1 = open.read_barrier().unwrap_err();
let error2 = open.write_barrier().unwrap_err();
let error3 = open.close_write_barrier().unwrap_err();
let error4 = open.close_read_barrier().unwrap_err();
assert_eq!(error1.kind(), ErrorKind::ConnectionReset);
assert_eq!(error2.kind(), ErrorKind::ConnectionReset);
assert_eq!(error3.kind(), ErrorKind::ConnectionReset);
assert_eq!(error4.kind(), ErrorKind::ConnectionReset);
}
#[test]
fn should_read_flags_in_async_write_after_read_closed() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::FIN, &mut Bytes::default());
assert!(open.read_flags_in_async_write())
}
#[test]
fn cannot_read_or_write_after_receiving_fin_and_stop_sending() {
let mut open = State::Open;
open.handle_inbound_flag(Flag::FIN, &mut Bytes::default());
open.handle_inbound_flag(Flag::STOP_SENDING, &mut Bytes::default());
let error1 = open.read_barrier().unwrap_err();
let error2 = open.write_barrier().unwrap_err();
assert_eq!(error1.kind(), ErrorKind::BrokenPipe);
assert_eq!(error2.kind(), ErrorKind::BrokenPipe);
}
#[test]
fn can_read_after_closing_write() {
let mut open = State::Open;
open.close_write_barrier().unwrap();
open.close_write_message_sent();
open.write_closed();
open.read_barrier().unwrap();
}
#[test]
fn can_write_after_closing_read() {
let mut open = State::Open;
open.close_read_barrier().unwrap();
open.close_read_message_sent();
open.read_closed();
open.write_barrier().unwrap();
}
#[test]
fn cannot_write_after_starting_close() {
let mut open = State::Open;
open.close_write_barrier().expect("to close in open");
let error = open.write_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe);
}
#[test]
fn cannot_read_after_starting_close() {
let mut open = State::Open;
open.close_read_barrier().expect("to close in open");
let error = open.read_barrier().unwrap_err();
assert_eq!(error.kind(), ErrorKind::BrokenPipe);
}
#[test]
fn can_read_in_open() {
let open = State::Open;
let result = open.read_barrier();
result.unwrap();
}
#[test]
fn can_write_in_open() {
let open = State::Open;
let result = open.write_barrier();
result.unwrap();
}
#[test]
fn write_close_barrier_returns_ok_when_closed() {
let mut open = State::Open;
open.close_write_barrier().unwrap();
open.close_write_message_sent();
open.write_closed();
let maybe = open.close_write_barrier().unwrap();
assert!(maybe.is_none())
}
#[test]
fn read_close_barrier_returns_ok_when_closed() {
let mut open = State::Open;
open.close_read_barrier().unwrap();
open.close_read_message_sent();
open.read_closed();
let maybe = open.close_read_barrier().unwrap();
assert!(maybe.is_none())
}
#[test]
fn reset_flag_clears_buffer() {
let mut open = State::Open;
let mut buffer = Bytes::copy_from_slice(b"foobar");
open.handle_inbound_flag(Flag::RESET, &mut buffer);
assert!(buffer.is_empty());
}
}

View File

@ -0,0 +1,101 @@
use crate::fingerprint::Fingerprint;
use libp2p_core::{multiaddr::Protocol, Multiaddr};
use std::net::{IpAddr, SocketAddr};
/// Parse the given [`Multiaddr`] into a [`SocketAddr`] and a [`Fingerprint`] for dialing.
pub fn parse_webrtc_dial_addr(addr: &Multiaddr) -> Option<(SocketAddr, Fingerprint)> {
let mut iter = addr.iter();
let ip = match iter.next()? {
Protocol::Ip4(ip) => IpAddr::from(ip),
Protocol::Ip6(ip) => IpAddr::from(ip),
_ => return None,
};
let port = iter.next()?;
let webrtc = iter.next()?;
let certhash = iter.next()?;
let (port, fingerprint) = match (port, webrtc, certhash) {
(Protocol::Udp(port), Protocol::WebRTCDirect, Protocol::Certhash(cert_hash)) => {
let fingerprint = Fingerprint::try_from_multihash(cert_hash)?;
(port, fingerprint)
}
_ => return None,
};
match iter.next() {
Some(Protocol::P2p(_)) => {}
// peer ID is optional
None => {}
// unexpected protocol
Some(_) => return None,
}
Some((SocketAddr::new(ip, port), fingerprint))
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{Ipv4Addr, Ipv6Addr};
#[test]
fn parse_valid_address_with_certhash_and_p2p() {
let addr = "/ip4/127.0.0.1/udp/39901/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_dial_addr(&addr);
assert_eq!(
maybe_parsed,
Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901),
Fingerprint::raw(hex_literal::hex!(
"e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb"
))
))
);
}
#[test]
fn peer_id_is_not_required() {
let addr = "/ip4/127.0.0.1/udp/39901/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_dial_addr(&addr);
assert_eq!(
maybe_parsed,
Some((
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901),
Fingerprint::raw(hex_literal::hex!(
"e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb"
))
))
);
}
#[test]
fn parse_ipv6() {
let addr =
"/ip6/::1/udp/12345/webrtc-direct/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS"
.parse()
.unwrap();
let maybe_parsed = parse_webrtc_dial_addr(&addr);
assert_eq!(
maybe_parsed,
Some((
SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345),
Fingerprint::raw(hex_literal::hex!(
"e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb"
))
))
);
}
}