diff --git a/Cargo.toml b/Cargo.toml index c1c756a6..b8b1eab5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ libp2p-core = { version = "0.7.0", path = "./core" } libp2p-core-derive = { version = "0.7.0", path = "./misc/core-derive" } libp2p-secio = { version = "0.7.0", path = "./protocols/secio", default-features = false } libp2p-uds = { version = "0.7.0", path = "./transports/uds" } +libp2p-wasm-ext = { version = "0.1.0", path = "./transports/wasm-ext" } libp2p-websocket = { version = "0.7.0", path = "./transports/websocket", optional = true } libp2p-yamux = { version = "0.7.0", path = "./muxers/yamux" } parking_lot = "0.7" @@ -76,5 +77,6 @@ members = [ "transports/ratelimit", "transports/tcp", "transports/uds", - "transports/websocket" + "transports/websocket", + "transports/wasm-ext" ] diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml new file mode 100644 index 00000000..141c9ab1 --- /dev/null +++ b/transports/wasm-ext/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "libp2p-wasm-ext" +version = "0.1.0" +authors = ["Pierre Krieger "] +edition = "2018" +description = "Allows passing in an external transport in a WASM environment" +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +futures = "0.1" +js-sys = "0.3.19" +libp2p-core = { version = "0.7.0", path = "../../core" } +send_wrapper = "0.2.0" +tokio-io = "0.1" +wasm-bindgen = "0.2.42" +wasm-bindgen-futures = "0.3.19" diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs new file mode 100644 index 00000000..8577590d --- /dev/null +++ b/transports/wasm-ext/src/lib.rs @@ -0,0 +1,518 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the libp2p `Transport` trait for external transports. +//! +//! This `Transport` is used in the context of WASM to allow delegating the transport mechanism +//! to the code that uses rust-libp2p, as opposed to inside of rust-libp2p itself. +//! +//! > **Note**: This only allows transports that produce a raw stream with the remote. You +//! > couldn't, for example, pass an implementation QUIC. +//! +//! # Usage +//! +//! Call `new()` with a JavaScript object that implements the interface described in the `ffi` +//! module. +//! + +use futures::{future::FutureResult, prelude::*, stream::Stream, try_ready}; +use libp2p_core::{transport::ListenerEvent, transport::TransportError, Multiaddr, Transport}; +use send_wrapper::SendWrapper; +use std::{collections::VecDeque, error, fmt, io, mem}; +use wasm_bindgen::{JsCast, prelude::*}; + +/// Contains the definition that one must match on the JavaScript side. +pub mod ffi { + use wasm_bindgen::prelude::*; + + #[wasm_bindgen] + extern "C" { + /// Type of the object that allows opening connections. + pub type Transport; + /// Type of the object that represents an open connection with a remote. + pub type Connection; + /// Type of the object that represents an event generated by listening. + pub type ListenEvent; + /// Type of the object that represents an event containing a new connection with a remote. + pub type ConnectionEvent; + + /// Start attempting to dial the given multiaddress. + /// + /// The returned `Promise` must yield a [`Connection`] on success. + /// + /// If the multiaddress is not supported, you should return an instance of `Error` whose + /// `name` property has been set to the string `"NotSupportedError"`. + #[wasm_bindgen(method, catch)] + pub fn dial(this: &Transport, multiaddr: &str) -> Result; + + /// Start listening on the given multiaddress. + /// + /// The returned `Iterator` must yield `Promise`s to [`ListenEvent`] events. + /// + /// If the multiaddress is not supported, you should return an instance of `Error` whose + /// `name` property has been set to the string `"NotSupportedError"`. + #[wasm_bindgen(method, catch)] + pub fn listen_on(this: &Transport, multiaddr: &str) -> Result; + + /// Returns a `Readable​Stream​` that . + #[wasm_bindgen(method, getter)] + pub fn read(this: &Connection) -> js_sys::Iterator; + + /// Writes data to the connection. Returns a `Promise` that resolves when the connection is + /// ready for writing again. + /// + /// If the `Promise` returns an error, the writing side of the connection is considered + /// unrecoverable and the connection should be closed as soon as possible. + /// + /// Guaranteed to only be called after the previous write promise has resolved. + #[wasm_bindgen(method, catch)] + pub fn write(this: &Connection, data: &[u8]) -> Result; + + /// Shuts down the writing side of the connection. After this has been called, the `write` + /// method will no longer be called. + #[wasm_bindgen(method, catch)] + pub fn shutdown(this: &Connection) -> Result<(), JsValue>; + + /// Closes the connection. No other method will be called on this connection anymore. + #[wasm_bindgen(method)] + pub fn close(this: &Connection); + + /// List of addresses we have started listening on. Must be an array of strings of multiaddrs. + #[wasm_bindgen(method, getter)] + pub fn new_addrs(this: &ListenEvent) -> Option>; + + /// List of addresses that have expired. Must be an array of strings of multiaddrs. + #[wasm_bindgen(method, getter)] + pub fn expired_addrs(this: &ListenEvent) -> Option>; + + /// List of [`ConnectionEvent`] object that has been received. + #[wasm_bindgen(method, getter)] + pub fn new_connections(this: &ListenEvent) -> Option>; + + /// Promise to the next event that the listener will generate. + #[wasm_bindgen(method, getter)] + pub fn next_event(this: &ListenEvent) -> JsValue; + + /// The [`Connection`] object for communication with the remote. + #[wasm_bindgen(method, getter)] + pub fn connection(this: &ConnectionEvent) -> Connection; + + /// The address we observe for the remote connection. + #[wasm_bindgen(method, getter)] + pub fn observed_addr(this: &ConnectionEvent) -> String; + + /// The address we are listening on, that received the remote connection. + #[wasm_bindgen(method, getter)] + pub fn local_addr(this: &ConnectionEvent) -> String; + } +} + +/// Implementation of `Transport` whose implementation is handled by some FFI. +pub struct ExtTransport { + inner: SendWrapper, +} + +impl ExtTransport { + /// Creates a new `ExtTransport` that uses the given external `Transport`. + pub fn new(transport: ffi::Transport) -> Self { + ExtTransport { + inner: SendWrapper::new(transport), + } + } +} + +impl fmt::Debug for ExtTransport { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("ExtTransport").finish() + } +} + +impl Clone for ExtTransport { + fn clone(&self) -> Self { + ExtTransport { + inner: SendWrapper::new(self.inner.clone().into()), + } + } +} + +impl Transport for ExtTransport { + type Output = Connection; + type Error = JsErr; + type Listener = Listen; + type ListenerUpgrade = FutureResult; + type Dial = Dial; + + fn listen_on(self, addr: Multiaddr) -> Result> { + let iter = self + .inner + .listen_on(&addr.to_string()) + .map_err(|err| { + if is_not_supported_error(&err) { + TransportError::MultiaddrNotSupported(addr) + } else { + TransportError::Other(JsErr::from(err)) + } + })?; + + Ok(Listen { + iterator: SendWrapper::new(iter), + next_event: None, + pending_events: VecDeque::new(), + }) + } + + fn dial(self, addr: Multiaddr) -> Result> { + let promise = self + .inner + .dial(&addr.to_string()) + .map_err(|err| { + if is_not_supported_error(&err) { + TransportError::MultiaddrNotSupported(addr) + } else { + TransportError::Other(JsErr::from(err)) + } + })?; + + Ok(Dial { + inner: SendWrapper::new(promise.into()), + }) + } +} + +/// Future that dial a remote through an external transport. +#[must_use = "futures do nothing unless polled"] +pub struct Dial { + /// A promise that will resolve to a `ffi::Connection` on success. + inner: SendWrapper, +} + +impl fmt::Debug for Dial { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Dial").finish() + } +} + +impl Future for Dial { + type Item = Connection; + type Error = JsErr; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::Ready(connec)) => Ok(Async::Ready(Connection::new(connec.into()))), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(JsErr::from(err)), + } + } +} + +/// Stream that listens for incoming connections through an external transport. +#[must_use = "futures do nothing unless polled"] +pub struct Listen { + /// Iterator of `ListenEvent`s. + iterator: SendWrapper, + /// Promise that will yield the next `ListenEvent`. + next_event: Option>, + /// List of events that we are waiting to propagate. + pending_events: VecDeque>>, +} + +impl fmt::Debug for Listen { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Listen").finish() + } +} + +impl Stream for Listen { + type Item = ListenerEvent>; + type Error = JsErr; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if let Some(ev) = self.pending_events.pop_front() { + return Ok(Async::Ready(Some(ev))); + } + + if self.next_event.is_none() { + let ev = self.iterator.next()?; + if !ev.done() { + let promise: js_sys::Promise = ev.value().into(); + self.next_event = Some(SendWrapper::new(promise.into())); + } + } + + let event = if let Some(next_event) = self.next_event.as_mut() { + let e = ffi::ListenEvent::from(try_ready!(next_event.poll())); + self.next_event = None; + e + } else { + return Ok(Async::Ready(None)); + }; + + for addr in event + .new_addrs() + .into_iter() + .flat_map(|e| e.to_vec().into_iter()) + { + let addr = js_value_to_addr(&addr)?; + self.pending_events + .push_back(ListenerEvent::NewAddress(addr)); + } + + for upgrade in event + .new_connections() + .into_iter() + .flat_map(|e| e.to_vec().into_iter()) + { + let upgrade: ffi::ConnectionEvent = upgrade.into(); + self.pending_events.push_back(ListenerEvent::Upgrade { + listen_addr: upgrade.local_addr().parse()?, + remote_addr: upgrade.observed_addr().parse()?, + upgrade: futures::future::ok(Connection::new(upgrade.connection())), + }); + } + + for addr in event + .expired_addrs() + .into_iter() + .flat_map(|e| e.to_vec().into_iter()) + { + let addr = js_value_to_addr(&addr)?; + self.pending_events + .push_back(ListenerEvent::AddressExpired(addr)); + } + } + } +} + +/// Active stream of data with a remote. +pub struct Connection { + /// The FFI object. + inner: SendWrapper, + + /// The iterator that was returned by `read()`. + read_iterator: SendWrapper, + + /// Reading part of the connection. + read_state: ConnectionReadState, + + /// When we write data using the FFI, a promise is returned containing the moment when the + /// underlying transport is ready to accept data again. This promise is stored here. + /// If this is `Some`, we must wait until the contained promise is resolved to write again. + previous_write_promise: Option>, +} + +impl Connection { + /// Initializes a `Connection` object from the FFI connection. + fn new(inner: ffi::Connection) -> Self { + let read_iterator = inner.read(); + + Connection { + inner: SendWrapper::new(inner), + read_iterator: SendWrapper::new(read_iterator), + read_state: ConnectionReadState::PendingData(Vec::new()), + previous_write_promise: None, + } + } +} + +/// Reading side of the connection. +enum ConnectionReadState { + /// Some data have been read and are waiting to be transferred. Can be empty. + PendingData(Vec), + /// Waiting for a `Promise` containing the next data. + Waiting(SendWrapper), + /// An error occurred or an earlier read yielded EOF. + Finished, +} + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("Connection").finish() + } +} + +impl io::Read for Connection { + fn read(&mut self, buf: &mut [u8]) -> Result { + loop { + match mem::replace(&mut self.read_state, ConnectionReadState::Finished) { + ConnectionReadState::Finished => break Err(io::ErrorKind::BrokenPipe.into()), + + ConnectionReadState::PendingData(ref data) if data.is_empty() => { + let iter_next = self.read_iterator.next().map_err(JsErr::from)?; + if iter_next.done() { + self.read_state = ConnectionReadState::Finished; + } else { + let promise: js_sys::Promise = iter_next.value().into(); + let promise = SendWrapper::new(promise.into()); + self.read_state = ConnectionReadState::Waiting(promise); + } + continue; + } + + ConnectionReadState::PendingData(mut data) => { + debug_assert!(!data.is_empty()); + if buf.len() <= data.len() { + buf.copy_from_slice(&data[..buf.len()]); + self.read_state = + ConnectionReadState::PendingData(data.split_off(buf.len())); + break Ok(buf.len()); + } else { + let len = data.len(); + buf[..len].copy_from_slice(&data); + self.read_state = ConnectionReadState::PendingData(Vec::new()); + break Ok(len); + } + } + + ConnectionReadState::Waiting(mut promise) => { + let data = match promise.poll().map_err(JsErr::from)? { + Async::Ready(ref data) if data.is_null() => break Ok(0), + Async::Ready(data) => data, + Async::NotReady => { + self.read_state = ConnectionReadState::Waiting(promise); + break Err(io::ErrorKind::WouldBlock.into()); + } + }; + + // Try to directly copy the data into `buf` if it is large enough, otherwise + // transition to `PendingData` and loop again. + let data = js_sys::Uint8Array::new(&data); + let data_len = data.length() as usize; + if data_len <= buf.len() { + data.copy_to(&mut buf[..data_len]); + self.read_state = ConnectionReadState::PendingData(Vec::new()); + break Ok(data_len); + } else { + let mut tmp_buf = vec![0; data_len]; + data.copy_to(&mut tmp_buf[..]); + self.read_state = ConnectionReadState::PendingData(tmp_buf); + continue; + } + } + } + } + } +} + +impl tokio_io::AsyncRead for Connection {} + +impl io::Write for Connection { + fn write(&mut self, buf: &[u8]) -> Result { + if let Some(mut promise) = self.previous_write_promise.take() { + match promise.poll().map_err(JsErr::from)? { + Async::Ready(_) => (), + Async::NotReady => { + self.previous_write_promise = Some(promise); + return Err(io::ErrorKind::WouldBlock.into()); + } + } + } + + debug_assert!(self.previous_write_promise.is_none()); + self.previous_write_promise = Some(SendWrapper::new( + self.inner.write(buf).map_err(JsErr::from)?.into(), + )); + Ok(buf.len()) + } + + fn flush(&mut self) -> Result<(), io::Error> { + // There's no flushing mechanism. In the FFI we consider that writing implicitly flushes. + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Connection { + fn shutdown(&mut self) -> Poll<(), io::Error> { + // Shutting down is considered instantaneous. + self.inner.shutdown().map_err(JsErr::from)?; + Ok(Async::Ready(())) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + self.inner.close(); + } +} + +/// Returns true if `err` is an error about an address not being supported. +fn is_not_supported_error(err: &JsValue) -> bool { + if let Some(err) = err.dyn_ref::() { + if String::from(err.name()) == "NotSupportedError" { + true + } else { + false + } + } else { + false + } +} + +/// Turns a `JsValue` containing a `String` into a `Multiaddr`, if possible. +fn js_value_to_addr(addr: &JsValue) -> Result { + if let Some(addr) = addr.as_string() { + Ok(addr.parse()?) + } else { + Err(JsValue::from_str("Element in new_addrs is not a string").into()) + } +} + +/// Error that can be generated by the `ExtTransport`. +pub struct JsErr(SendWrapper); + +impl From for JsErr { + fn from(val: JsValue) -> JsErr { + JsErr(SendWrapper::new(val)) + } +} + +impl From for JsErr { + fn from(err: libp2p_core::multiaddr::Error) -> JsErr { + JsValue::from_str(&err.to_string()).into() + } +} + +impl From for io::Error { + fn from(err: JsErr) -> io::Error { + io::Error::new(io::ErrorKind::Other, err.to_string()) + } +} + +impl fmt::Debug for JsErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self) + } +} + +impl fmt::Display for JsErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(s) = self.0.as_string() { + write!(f, "{}", s) + } else if let Some(err) = self.0.dyn_ref::() { + write!(f, "{}", String::from(err.message())) + } else if let Some(obj) = self.0.dyn_ref::() { + write!(f, "{}", String::from(obj.to_string())) + } else { + write!(f, "{:?}", &*self.0) + } + } +} + +impl error::Error for JsErr {}