diff --git a/Cargo.toml b/Cargo.toml index 68382b27..a27f3b70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "core", "tcp-transport", "transport-timeout", + "uds", "varint-rs", "websocket", "yamux" diff --git a/README.md b/README.md index d1d4123b..98e1091b 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Architecture of the other crates of this repository: - `rw-stream-sink`: Utility library that makes it possible to wrap around a tokio `Stream + Sink` of bytes and implements `AsyncRead + AsyncWrite`. - `secio`: Implementation of the `secio` protocol. Encrypts communications. Implements the `ConnectionUpgrade` trait of `core`. - `tcp-transport`: Implementation of the `Transport` trait of `core` for TCP/IP. +- `uds`: Implementation of `Transport` for UNIX domain sockets. - `varint`: encoding and decoding state machines for protobuf varints. - `websocket`: Implementation of the `Transport` trait of `core` for Websockets. diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 7c7fae85..517eb68e 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -22,6 +22,7 @@ libp2p-ratelimit = { path = "../ratelimit" } libp2p-relay = { path = "../relay" } libp2p-core = { path = "../core" } libp2p-transport-timeout = { path = "../transport-timeout" } +libp2p-uds = { path = "../uds" } libp2p-websocket = { path = "../websocket" } libp2p-yamux = { path = "../yamux" } tokio-codec = "0.1" diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 8789edd2..484342e6 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -42,6 +42,7 @@ pub extern crate libp2p_secio as secio; #[cfg(not(target_os = "emscripten"))] pub extern crate libp2p_tcp_transport as tcp; pub extern crate libp2p_transport_timeout as transport_timeout; +pub extern crate libp2p_uds as uds; pub extern crate libp2p_websocket as websocket; pub extern crate libp2p_yamux as yamux; diff --git a/uds/Cargo.toml b/uds/Cargo.toml new file mode 100644 index 00000000..ac20bf63 --- /dev/null +++ b/uds/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "libp2p-uds" +version = "0.1.0" +authors = ["Parity Technologies "] +description = "Unix domain sockets transport for libp2p" + +[target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] +libp2p-core = { path = "../core" } +log = "0.4.1" +futures = "0.1" +multiaddr = { path = "../multiaddr" } +tokio-uds = "0.2" + +[target.'cfg(all(unix, not(target_os = "emscripten")))'.dev-dependencies] +tempfile = "3.0" +tokio-current-thread = "0.1" +tokio-io = "0.1" diff --git a/uds/src/lib.rs b/uds/src/lib.rs new file mode 100644 index 00000000..1c475b24 --- /dev/null +++ b/uds/src/lib.rs @@ -0,0 +1,238 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the libp2p `Transport` trait for Unix domain sockets. +//! +//! Uses [the *tokio* library](https://tokio.rs). +//! +//! # Platform support +//! +//! This transport only works on Unix platforms. +//! +//! # Usage +//! +//! The `UdsConfig` transport supports multiaddresses of the form `/unix//tmp/foo`. +//! +//! Example: +//! +//! ``` +//! extern crate libp2p_uds; +//! use libp2p_uds::UdsConfig; +//! +//! # fn main() { +//! let uds = UdsConfig::new(); +//! # } +//! ``` +//! +//! The `UdsConfig` structs implements the `Transport` trait of the `core` library. See the +//! documentation of `core` and of libp2p in general to learn how to use the `Transport` trait. + +#![cfg(all(unix, not(target_os = "emscripten")))] + +extern crate futures; +extern crate libp2p_core; +#[macro_use] +extern crate log; +extern crate multiaddr; +extern crate tokio_uds; + +#[cfg(test)] +extern crate tempfile; +#[cfg(test)] +extern crate tokio_current_thread; +#[cfg(test)] +extern crate tokio_io; + +use futures::future::{self, Future, FutureResult}; +use futures::stream::Stream; +use multiaddr::{AddrComponent, Multiaddr}; +use std::io::Error as IoError; +use std::path::PathBuf; +use libp2p_core::Transport; +use tokio_uds::{UnixListener, UnixStream}; + +/// Represents the configuration for a Unix domain sockets transport capability for libp2p. +/// +/// The Unixs sockets created by libp2p will need to be progressed by running the futures and +/// streams obtained by libp2p through the tokio reactor. +#[derive(Debug, Clone)] +pub struct UdsConfig { +} + +impl UdsConfig { + /// Creates a new configuration object for TCP/IP. + #[inline] + pub fn new() -> UdsConfig { + UdsConfig {} + } +} + +impl Transport for UdsConfig { + type Output = UnixStream; + type Listener = Box>; + type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; + type MultiaddrFuture = FutureResult; + type Dial = Box>; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + if let Ok(path) = multiaddr_to_path(&addr) { + let listener = UnixListener::bind(&path); + // We need to build the `Multiaddr` to return from this function. If an error happened, + // just return the original multiaddr. + match listener { + Ok(_) => {}, + Err(_) => return Err((self, addr)), + }; + + debug!("Now listening on {}", addr); + let new_addr = addr.clone(); + + let future = future::result(listener) + .map(move |listener| { + // Pull out a stream of sockets for incoming connections + listener.incoming().map(move |sock| { + debug!("Incoming connection on {}", addr); + future::ok((sock, future::ok(addr.clone()))) + }) + }) + .flatten_stream(); + Ok((Box::new(future), new_addr)) + } else { + Err((self, addr)) + } + } + + fn dial(self, addr: Multiaddr) -> Result { + if let Ok(path) = multiaddr_to_path(&addr) { + debug!("Dialing {}", addr); + let fut = UnixStream::connect(&path).map(|t| (t, future::ok(addr))); + Ok(Box::new(fut) as Box<_>) + } else { + Err((self, addr)) + } + } + + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if server == observed { + Some(observed.clone()) + } else { + None + } + } +} + +// This type of logic should probably be moved into the multiaddr package +fn multiaddr_to_path(addr: &Multiaddr) -> Result { + let mut iter = addr.iter(); + let path = iter.next(); + + if iter.next().is_some() { + return Err(()); + } + + match path { + Some(AddrComponent::UNIX(ref path)) => Ok(path.into()), + _ => Err(()) + } +} + +#[cfg(test)] +mod tests { + use super::{multiaddr_to_path, UdsConfig}; + use futures::stream::Stream; + use futures::Future; + use multiaddr::{AddrComponent, Multiaddr}; + use std; + use std::path::Path; + use libp2p_core::Transport; + use tempfile; + use tokio_current_thread; + use tokio_io; + + #[test] + fn multiaddr_to_path_conversion() { + assert!( + multiaddr_to_path(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) + .is_err() + ); + + assert_eq!( + multiaddr_to_path(&Multiaddr::from(AddrComponent::UNIX("/tmp/foo".into()))), + Ok(Path::new("/tmp/foo").to_owned()) + ); + assert_eq!( + multiaddr_to_path(&Multiaddr::from(AddrComponent::UNIX("/home/bar/baz".into()))), + Ok(Path::new("/home/bar/baz").to_owned()) + ); + } + + #[test] + fn communicating_between_dialer_and_listener() { + use std::io::Write; + + let temp_dir = tempfile::tempdir().unwrap(); + let socket = temp_dir.path().join("socket"); + let addr = Multiaddr::from(AddrComponent::UNIX(socket.to_string_lossy().into_owned())); + let addr2 = addr.clone(); + + std::thread::spawn(move || { + let tcp = UdsConfig::new(); + let listener = tcp.listen_on(addr2).unwrap().0.for_each(|sock| { + sock.and_then(|(sock, _)| { + // Define what to do with the socket that just connected to us + // Which in this case is read 3 bytes + let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) + .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) + .map_err(|err| panic!("IO error {:?}", err)); + + // Spawn the future as a concurrent task + tokio_current_thread::spawn(handle_conn); + + Ok(()) + }) + }); + + tokio_current_thread::block_on_all(listener).unwrap(); + }); + std::thread::sleep(std::time::Duration::from_millis(100)); + let tcp = UdsConfig::new(); + // Obtain a future socket through dialing + let socket = tcp.dial(addr.clone()).unwrap(); + // Define what to do with the socket once it's obtained + let action = socket.then(|sock| -> Result<(), ()> { + sock.unwrap().0.write(&[0x1, 0x2, 0x3]).unwrap(); + Ok(()) + }); + // Execute the future in our event loop + tokio_current_thread::block_on_all(action).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + #[test] + #[ignore] // TODO: for the moment unix addresses fail to parse + fn larger_addr_denied() { + let tcp = UdsConfig::new(); + + let addr = "/ip4/127.0.0.1/tcp/12345/unix//foo/bar" + .parse::() + .unwrap(); + assert!(tcp.listen_on(addr).is_err()); + } +}