More concerns

This commit is contained in:
Pierre Krieger
2018-01-11 10:51:13 +01:00
parent 2783c5713e
commit b8829d7cb1
5 changed files with 59 additions and 42 deletions

View File

@ -8,16 +8,16 @@ or for a different operating system.
# Emscripten # Emscripten
On emscripten, you can create a `WsConfig` object with `WsConfig::new()`. It can then be used On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can
as a transport. then be used as a transport.
Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress
which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object.
```rust ```rust
use libp2p_websocket::WsConfig; use libp2p_websocket::BrowserWsConfig;
let ws_config = WsConfig::new(); let ws_config = BrowserWsConfig::new();
// let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap());
``` ```
@ -26,7 +26,8 @@ let ws_config = WsConfig::new();
On other operating systems, this library doesn't open any socket by itself. Instead it must be On other operating systems, this library doesn't open any socket by itself. Instead it must be
plugged on top of another implementation of `Transport` such as TCP/IP. plugged on top of another implementation of `Transport` such as TCP/IP.
This underlying transport must be passed to the `WsConfig::new()` function. This underlying transport must be put inside a `WsConfig` object through the
`WsConfig::new()` function.
```rust ```rust
extern crate libp2p_swarm; extern crate libp2p_swarm;

View File

@ -38,21 +38,21 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// ///
/// > **Note**: The `/wss` protocol isn't supported. /// > **Note**: The `/wss` protocol isn't supported.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WsConfig; pub struct BrowserWsConfig;
impl WsConfig { impl BrowserWsConfig {
/// Creates a new configuration object for websocket. /// Creates a new configuration object for websocket.
#[inline] #[inline]
pub fn new() -> WsConfig { pub fn new() -> BrowserWsConfig {
WsConfig BrowserWsConfig
} }
} }
impl Transport for WsConfig { impl Transport for BrowserWsConfig {
type RawConn = WsConn; type RawConn = BrowserWsConn;
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>; // TODO: use `!` type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>; // TODO: use `!`
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>; // TODO: use `!` type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>; // TODO: use `!`
type Dial = FutureThen<oneshot::Receiver<Result<WsConn, IoError>>, Result<WsConn, IoError>, fn(Result<Result<WsConn, IoError>, oneshot::Canceled>) -> Result<WsConn, IoError>>; type Dial = FutureThen<oneshot::Receiver<Result<BrowserWsConn, IoError>>, Result<BrowserWsConn, IoError>, fn(Result<Result<BrowserWsConn, IoError>, oneshot::Canceled>) -> Result<BrowserWsConn, IoError>>;
#[inline] #[inline]
fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -107,10 +107,10 @@ impl Transport for WsConfig {
} }
}; };
// Create a `open` channel that will be used to communicate the `WsConn` that represents // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents
// the open dialing websocket. Also create a `open_cb` callback that will be used for the // the open dialing websocket. Also create a `open_cb` callback that will be used for the
// `open` message of the websocket. // `open` message of the websocket.
let (open_tx, open_rx) = oneshot::channel::<Result<WsConn, IoError>>(); let (open_tx, open_rx) = oneshot::channel::<Result<BrowserWsConn, IoError>>();
let open_tx = Arc::new(Mutex::new(Some(open_tx))); let open_tx = Arc::new(Mutex::new(Some(open_tx)));
let websocket_clone = websocket.clone(); let websocket_clone = websocket.clone();
let open_cb = { let open_cb = {
@ -124,18 +124,18 @@ impl Transport for WsConfig {
// is not supposed to happen. // is not supposed to happen.
let message_rx = message_rx.take().expect("the websocket can only open once"); let message_rx = message_rx.take().expect("the websocket can only open once");
// Send a `WsConn` to the future that was returned by `dial`. Ignoring errors that // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that
// would happen the future has been dropped by the user. // would happen the future has been dropped by the user.
let _ = tx let _ = tx
.send(Ok(WsConn { .send(Ok(BrowserWsConn {
websocket: websocket_clone.clone(), websocket: websocket_clone.clone(),
incoming_data: RwStreamSink::new(message_rx.then(|result| { incoming_data: RwStreamSink::new(message_rx.then(|result| {
// An `Err` happens here if `message_tx` has been dropped. However // An `Err` happens here if `message_tx` has been dropped. However
// `message_tx` is grabbed by the websocket, which stays alive for as // `message_tx` is grabbed by the websocket, which stays alive for as
// long as the `WsConn` is alive. // long as the `BrowserWsConn` is alive.
match result { match result {
Ok(r) => r, Ok(r) => r,
Err(_) => unreachable!("the message channel outlives the WsConn") Err(_) => unreachable!("the message channel outlives the BrowserWsConn")
} }
})), })),
})); }));
@ -191,7 +191,7 @@ impl Transport for WsConfig {
} }
} }
pub struct WsConn { pub struct BrowserWsConn {
websocket: Reference, websocket: Reference,
// Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`.
incoming_data: RwStreamSink<StreamThen<mpsc::UnboundedReceiver<Result<Vec<u8>, IoError>>, incoming_data: RwStreamSink<StreamThen<mpsc::UnboundedReceiver<Result<Vec<u8>, IoError>>,
@ -200,7 +200,7 @@ pub struct WsConn {
>>, >>,
} }
impl Drop for WsConn { impl Drop for BrowserWsConn {
#[inline] #[inline]
fn drop(&mut self) { fn drop(&mut self) {
// TODO: apparently there's a memory leak related to callbacks? // TODO: apparently there's a memory leak related to callbacks?
@ -208,24 +208,24 @@ impl Drop for WsConn {
} }
} }
impl AsyncRead for WsConn { impl AsyncRead for BrowserWsConn {
} }
impl Read for WsConn { impl Read for BrowserWsConn {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
self.incoming_data.read(buf) self.incoming_data.read(buf)
} }
} }
impl AsyncWrite for WsConn { impl AsyncWrite for BrowserWsConn {
#[inline] #[inline]
fn shutdown(&mut self) -> Poll<(), IoError> { fn shutdown(&mut self) -> Poll<(), IoError> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
} }
impl Write for WsConn { impl Write for BrowserWsConn {
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> { fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
let typed_array = TypedArray::from(buf); let typed_array = TypedArray::from(buf);

View File

@ -74,11 +74,9 @@ where
}; };
let (inner_listen, new_addr) = match self.transport.listen_on(inner_addr) { let (inner_listen, new_addr) = match self.transport.listen_on(inner_addr) {
Ok((listen, inner_new_addr)) => { Ok((listen, mut new_addr)) => {
// Need to suffix `/ws` to the listening address. // Need to suffix `/ws` to the listening address.
let new_addr = inner_new_addr new_addr.append(AddrComponent::WS);
.encapsulate("/ws")
.expect("the /ws suffix is always valid");
(listen, new_addr) (listen, new_addr)
} }
Err((transport, _)) => { Err((transport, _)) => {
@ -91,11 +89,9 @@ where
} }
}; };
let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, client_addr)| { let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, mut client_addr)| {
// Need to suffix `/ws` to each client address. // Need to suffix `/ws` to each client address.
let client_addr = client_addr client_addr.append(AddrComponent::WS);
.encapsulate("/ws")
.expect("the /ws suffix is always valid");
// Upgrade the listener to websockets like the websockets library requires us to do. // Upgrade the listener to websockets like the websockets library requires us to do.
let upgraded = stream.and_then(|stream| { let upgraded = stream.and_then(|stream| {
@ -118,8 +114,9 @@ where
OwnedMessage::Binary(data) => Ok(Some(data)), OwnedMessage::Binary(data) => Ok(Some(data)),
OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), OwnedMessage::Text(data) => Ok(Some(data.into_bytes())),
OwnedMessage::Close(_) => Ok(None), OwnedMessage::Close(_) => Ok(None),
// TODO: pings and pongs ; freaking hard // TODO: handle pings and pongs, which is freaking hard
_ => unimplemented!() // for now we close the socket when that happens
_ => Ok(None)
} }
}) })
// TODO: is there a way to merge both lines into one? // TODO: is there a way to merge both lines into one?
@ -181,8 +178,9 @@ where
OwnedMessage::Binary(data) => Ok(data), OwnedMessage::Binary(data) => Ok(data),
OwnedMessage::Text(data) => Ok(data.into_bytes()), OwnedMessage::Text(data) => Ok(data.into_bytes()),
// TODO: pings and pongs and close messages need to be // TODO: pings and pongs and close messages need to be
// answered ; and this is really hard // answered ; and this is really hard ; for now we produce
_ => unimplemented!(), // an error when that happens
_ => Err(IoError::new(IoErrorKind::Other, "unimplemented")),
} }
}); });
let read_write = RwStreamSink::new(framed_data); let read_write = RwStreamSink::new(framed_data);

View File

@ -33,16 +33,16 @@
//! //!
//! # Emscripten //! # Emscripten
//! //!
//! On emscripten, you can create a `WsConfig` object with `WsConfig::new()`. It can then be used //! On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can
//! as a transport. //! then be used as a transport.
//! //!
//! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress //! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress
//! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. //! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object.
//! //!
//! ```ignore //! ```ignore
//! use libp2p_websocket::WsConfig; //! use libp2p_websocket::BrowserWsConfig;
//! //!
//! let ws_config = WsConfig::new(); //! let ws_config = BrowserWsConfig::new();
//! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); //! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap());
//! ``` //! ```
//! //!
@ -51,7 +51,8 @@
//! On other operating systems, this library doesn't open any socket by itself. Instead it must be //! On other operating systems, this library doesn't open any socket by itself. Instead it must be
//! plugged on top of another implementation of `Transport` such as TCP/IP. //! plugged on top of another implementation of `Transport` such as TCP/IP.
//! //!
//! This underlying transport must be passed to the `WsConfig::new()` function. //! This underlying transport must be put inside a `WsConfig` object through the
//! `WsConfig::new()` function.
//! //!
//! ``` //! ```
//! extern crate libp2p_swarm; //! extern crate libp2p_swarm;
@ -91,6 +92,6 @@ mod desktop;
mod browser; mod browser;
#[cfg(target_os = "emscripten")] #[cfg(target_os = "emscripten")]
pub use self::browser::WsConfig; pub use self::browser::{BrowserWsConfig, BrowserWsConn};
#[cfg(not(target_os = "emscripten"))] #[cfg(not(target_os = "emscripten"))]
pub use self::desktop::WsConfig; pub use self::desktop::WsConfig;

View File

@ -102,6 +102,23 @@ impl Multiaddr {
Ok(Multiaddr { bytes: bytes }) Ok(Multiaddr { bytes: bytes })
} }
/// Adds an already-parsed address component to the end of this multiaddr.
///
/// # Examples
///
/// ```
/// use multiaddr::{Multiaddr, AddrComponent};
///
/// let mut address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap();
/// address.append(AddrComponent::TCP(10000));
/// assert_eq!(address, "/ip4/127.0.0.1/tcp/10000".parse().unwrap());
/// ```
///
#[inline]
pub fn append(&mut self, component: AddrComponent) {
component.write_bytes(&mut self.bytes).expect("writing to a Vec never fails")
}
/// Remove the outermost address. /// Remove the outermost address.
/// ///
/// # Examples /// # Examples