From 0a45f7310f58df41bbeda6d0d6cd09a67b28f6af Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 24 Jan 2020 16:40:48 +0100 Subject: [PATCH] Support tokio in libp2p-tcp and libp2p-uds (#1402) --- transports/tcp/Cargo.toml | 6 +- transports/tcp/src/lib.rs | 374 +++++++++++++++++++++----------------- transports/uds/Cargo.toml | 6 +- transports/uds/src/lib.rs | 56 +++--- 4 files changed, 250 insertions(+), 192 deletions(-) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index a9e2a2aa..7a0b6022 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,10 +10,14 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = "1.0" +async-std = { version = "1.0", optional = true } futures = "0.3.1" futures-timer = "2.0" get_if_addrs = "0.5.3" ipnet = "2.0.0" libp2p-core = { version = "0.14.0-alpha.1", path = "../../core" } log = "0.4.1" +tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true } + +[features] +default = ["async-std"] diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 99ebad02..36224dad 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -22,21 +22,13 @@ //! //! # Usage //! -//! Example: +//! This crate provides two structs, `TcpConfig` and `TokioTcpConfig`, depending on which +//! features are enabled. //! -//! ``` -//! extern crate libp2p_tcp; -//! use libp2p_tcp::TcpConfig; -//! -//! # fn main() { -//! let tcp = TcpConfig::new(); -//! # } -//! ``` -//! -//! The `TcpConfig` structs implements the `Transport` trait of the `swarm` library. See the -//! documentation of `swarm` and of libp2p in general to learn how to use the `Transport` trait. +//! Both the `TcpConfig` and `TokioTcpConfig` structs implement the `Transport` trait of the +//! `core` library. See the documentation of `core` and of libp2p in general to learn how to +//! use the `Transport` trait. -use async_std::net::TcpStream; use futures::{future::{self, Ready}, prelude::*}; use futures_timer::Delay; use get_if_addrs::{IfAddr, get_if_addrs}; @@ -57,12 +49,16 @@ use std::{ time::Duration }; +macro_rules! codegen { + ($feature_name:expr, $tcp_config:ident, $tcp_trans_stream:ident, $tcp_listen_stream:ident, $apply_config:ident, $tcp_stream:ty, $tcp_listener:ty) => { + /// Represents the configuration for a TCP/IP transport capability for libp2p. /// /// The TCP sockets created by libp2p will need to be progressed by running the futures and streams /// obtained by libp2p through the tokio reactor. +#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] #[derive(Debug, Clone, Default)] -pub struct TcpConfig { +pub struct $tcp_config { /// How long a listener should sleep after receiving an error, before trying again. sleep_on_error: Duration, /// TTL to set for opened sockets, or `None` to keep default. @@ -71,10 +67,10 @@ pub struct TcpConfig { nodelay: Option, } -impl TcpConfig { +impl $tcp_config { /// Creates a new configuration object for TCP/IP. - pub fn new() -> TcpConfig { - TcpConfig { + pub fn new() -> $tcp_config { + $tcp_config { sleep_on_error: Duration::from_millis(100), ttl: None, nodelay: None, @@ -94,12 +90,12 @@ impl TcpConfig { } } -impl Transport for TcpConfig { - type Output = TcpTransStream; +impl Transport for $tcp_config { + type Output = $tcp_trans_stream; type Error = io::Error; type Listener = Pin, io::Error>> + Send>>; type ListenerUpgrade = Ready>; - type Dial = Pin> + Send>>; + type Dial = Pin> + Send>>; fn listen_on(self, addr: Multiaddr) -> Result> { let socket_addr = @@ -109,10 +105,10 @@ impl Transport for TcpConfig { return Err(TransportError::MultiaddrNotSupported(addr)) }; - async fn do_listen(cfg: TcpConfig, socket_addr: SocketAddr) - -> Result>>, io::Error>>, io::Error> + async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr) + -> Result>>, io::Error>>, io::Error> { - let listener = async_std::net::TcpListener::bind(&socket_addr).await?; + let listener = <$tcp_listener>::bind(&socket_addr).await?; let local_addr = listener.local_addr()?; let port = local_addr.port(); @@ -148,7 +144,7 @@ impl Transport for TcpConfig { } }; - let listen_stream = TcpListenStream { + let listen_stream = $tcp_listen_stream { stream: listener, pause: None, pause_duration: cfg.sleep_on_error, @@ -178,16 +174,187 @@ impl Transport for TcpConfig { debug!("Dialing {}", addr); - async fn do_dial(cfg: TcpConfig, socket_addr: SocketAddr) -> Result { - let stream = TcpStream::connect(&socket_addr).await?; - apply_config(&cfg, &stream)?; - Ok(TcpTransStream { inner: stream }) + async fn do_dial(cfg: $tcp_config, socket_addr: SocketAddr) -> Result<$tcp_trans_stream, io::Error> { + let stream = <$tcp_stream>::connect(&socket_addr).await?; + $apply_config(&cfg, &stream)?; + Ok($tcp_trans_stream { inner: stream }) } Ok(Box::pin(do_dial(self, socket_addr))) } } +/// Stream that listens on an TCP/IP address. +#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] +pub struct $tcp_listen_stream { + /// The incoming connections. + stream: $tcp_listener, + /// The current pause if any. + pause: Option, + /// How long to pause after an error. + pause_duration: Duration, + /// The port which we use as our listen port in listener event addresses. + port: u16, + /// The set of known addresses. + addrs: Addresses, + /// Temporary buffer of listener events. + pending: Buffer<$tcp_trans_stream>, + /// Original configuration. + config: $tcp_config +} + +impl $tcp_listen_stream { + /// Takes ownership of the listener, and returns the next incoming event and the listener. + async fn next(mut self) -> (Result>>, io::Error>, Self) { + loop { + if let Some(event) = self.pending.pop_front() { + return (event, self); + } + + if let Some(pause) = self.pause.take() { + let _ = pause.await; + } + + // TODO: do we get the peer_addr at the same time? + let (sock, _) = match self.stream.accept().await { + Ok(s) => s, + Err(e) => { + debug!("error accepting incoming connection: {}", e); + self.pause = Some(Delay::new(self.pause_duration)); + return (Err(e), self); + } + }; + + let sock_addr = match sock.peer_addr() { + Ok(addr) => addr, + Err(err) => { + debug!("Failed to get peer address: {:?}", err); + continue + } + }; + + let local_addr = match sock.local_addr() { + Ok(sock_addr) => { + if let Addresses::Many(ref mut addrs) = self.addrs { + if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) { + return (Err(err), self); + } + } + ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) + } + Err(err) => { + debug!("Failed to get local address of incoming socket: {:?}", err); + continue + } + }; + + let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); + + match $apply_config(&self.config, &sock) { + Ok(()) => { + trace!("Incoming connection from {} at {}", remote_addr, local_addr); + self.pending.push_back(Ok(ListenerEvent::Upgrade { + upgrade: future::ok($tcp_trans_stream { inner: sock }), + local_addr, + remote_addr + })) + } + Err(err) => { + debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err); + self.pending.push_back(Ok(ListenerEvent::Upgrade { + upgrade: future::err(err), + local_addr, + remote_addr + })) + } + } + } + } +} + +/// Wraps around a `TcpStream` and adds logging for important events. +#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] +#[derive(Debug)] +pub struct $tcp_trans_stream { + inner: $tcp_stream, +} + +impl Drop for $tcp_trans_stream { + fn drop(&mut self) { + if let Ok(addr) = self.inner.peer_addr() { + debug!("Dropped TCP connection to {:?}", addr); + } else { + debug!("Dropped TCP connection to undeterminate peer"); + } + } +} + +/// Applies the socket configuration parameters to a socket. +fn $apply_config(config: &$tcp_config, socket: &$tcp_stream) -> Result<(), io::Error> { + if let Some(ttl) = config.ttl { + socket.set_ttl(ttl)?; + } + + if let Some(nodelay) = config.nodelay { + socket.set_nodelay(nodelay)?; + } + + Ok(()) +} + +}; +} + +#[cfg(feature = "async-std")] +codegen!("async-std", TcpConfig, TcpTransStream, TcpListenStream, apply_config_async_std, async_std::net::TcpStream, async_std::net::TcpListener); + +#[cfg(feature = "tokio")] +codegen!("tokio", TokioTcpConfig, TokioTcpTransStream, TokioTcpListenStream, apply_config_tokio, tokio::net::TcpStream, tokio::net::TcpListener); + +#[cfg(feature = "async-std")] +impl AsyncRead for TcpTransStream { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) + } +} + +#[cfg(feature = "async-std")] +impl AsyncWrite for TcpTransStream { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + AsyncWrite::poll_close(Pin::new(&mut self.inner), cx) + } +} + +#[cfg(feature = "tokio")] +impl AsyncRead for TokioTcpTransStream { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) + } +} + +#[cfg(feature = "tokio")] +impl AsyncWrite for TokioTcpTransStream { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.inner), cx) + } +} + // This type of logic should probably be moved into the multiaddr package fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { let mut iter = addr.iter(); @@ -240,19 +407,6 @@ fn host_addresses(port: u16) -> io::Result> { Ok(addrs) } -/// Applies the socket configuration parameters to a socket. -fn apply_config(config: &TcpConfig, socket: &TcpStream) -> Result<(), io::Error> { - if let Some(ttl) = config.ttl { - socket.set_ttl(ttl)?; - } - - if let Some(nodelay) = config.nodelay { - socket.set_nodelay(nodelay)?; - } - - Ok(()) -} - /// Listen address information. #[derive(Debug)] enum Addresses { @@ -262,34 +416,16 @@ enum Addresses { Many(Vec<(IpAddr, IpNet, Multiaddr)>) } -type Buffer = VecDeque>>, io::Error>>; - -/// Stream that listens on an TCP/IP address. -pub struct TcpListenStream { - /// The incoming connections. - stream: async_std::net::TcpListener, - /// The current pause if any. - pause: Option, - /// How long to pause after an error. - pause_duration: Duration, - /// The port which we use as our listen port in listener event addresses. - port: u16, - /// The set of known addresses. - addrs: Addresses, - /// Temporary buffer of listener events. - pending: Buffer, - /// Original configuration. - config: TcpConfig -} +type Buffer = VecDeque>>, io::Error>>; // If we listen on all interfaces, find out to which interface the given // socket address belongs. In case we think the address is new, check // all host interfaces again and report new and expired listen addresses. -fn check_for_interface_changes( +fn check_for_interface_changes( socket_addr: &SocketAddr, listen_port: u16, listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>, - pending: &mut Buffer + pending: &mut Buffer ) -> Result<(), io::Error> { // Check for exact match: if listen_addrs.iter().find(|(ip, ..)| ip == &socket_addr.ip()).is_some() { @@ -337,119 +473,17 @@ fn check_for_interface_changes( Ok(()) } -impl TcpListenStream { - /// Takes ownership of the listener, and returns the next incoming event and the listener. - async fn next(mut self) -> (Result>>, io::Error>, Self) { - loop { - if let Some(event) = self.pending.pop_front() { - return (event, self); - } - - if let Some(pause) = self.pause.take() { - let _ = pause.await; - } - - // TODO: do we get the peer_addr at the same time? - let (sock, _) = match self.stream.accept().await { - Ok(s) => s, - Err(e) => { - debug!("error accepting incoming connection: {}", e); - self.pause = Some(Delay::new(self.pause_duration)); - return (Err(e), self); - } - }; - - let sock_addr = match sock.peer_addr() { - Ok(addr) => addr, - Err(err) => { - debug!("Failed to get peer address: {:?}", err); - continue - } - }; - - let local_addr = match sock.local_addr() { - Ok(sock_addr) => { - if let Addresses::Many(ref mut addrs) = self.addrs { - if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) { - return (Err(err), self); - } - } - ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) - } - Err(err) => { - debug!("Failed to get local address of incoming socket: {:?}", err); - continue - } - }; - - let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); - - match apply_config(&self.config, &sock) { - Ok(()) => { - trace!("Incoming connection from {} at {}", remote_addr, local_addr); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::ok(TcpTransStream { inner: sock }), - local_addr, - remote_addr - })) - } - Err(err) => { - debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::err(err), - local_addr, - remote_addr - })) - } - } - } - } -} - -/// Wraps around a `TcpStream` and adds logging for important events. -#[derive(Debug)] -pub struct TcpTransStream { - inner: TcpStream, -} - -impl AsyncRead for TcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) - } -} - -impl AsyncWrite for TcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut self.inner), cx) - } -} - -impl Drop for TcpTransStream { - fn drop(&mut self) { - if let Ok(addr) = self.inner.peer_addr() { - debug!("Dropped TCP connection to {:?}", addr); - } else { - debug!("Dropped TCP connection to undeterminate peer"); - } - } -} - #[cfg(test)] mod tests { use futures::prelude::*; use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use super::{multiaddr_to_socketaddr, TcpConfig}; + use super::multiaddr_to_socketaddr; + #[cfg(feature = "async-std")] + use super::TcpConfig; #[test] + #[cfg(feature = "async-std")] fn wildcard_expansion() { let mut listener = TcpConfig::new() .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) @@ -539,6 +573,7 @@ mod tests { } #[test] + #[cfg(feature = "async-std")] fn communicating_between_dialer_and_listener() { let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); let mut ready_tx = Some(ready_tx); @@ -580,6 +615,7 @@ mod tests { } #[test] + #[cfg(feature = "async-std")] fn replace_port_0_in_returned_multiaddr_ipv4() { let tcp = TcpConfig::new(); @@ -597,6 +633,7 @@ mod tests { } #[test] + #[cfg(feature = "async-std")] fn replace_port_0_in_returned_multiaddr_ipv6() { let tcp = TcpConfig::new(); @@ -614,6 +651,7 @@ mod tests { } #[test] + #[cfg(feature = "async-std")] fn larger_addr_denied() { let tcp = TcpConfig::new(); diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index ad4f8ff7..a8e3c28d 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -10,10 +10,14 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dependencies] -async-std = "1.0" +async-std = { version = "1.0", optional = true } libp2p-core = { version = "0.14.0-alpha.1", path = "../../core" } log = "0.4.1" futures = "0.3.1" +tokio = { version = "0.2", default-features = false, features = ["uds"], optional = true } [target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dev-dependencies] tempfile = "3.0" + +[features] +default = ["async-std"] diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index dccee622..24c8c5d2 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -28,23 +28,12 @@ //! //! The `UdsConfig` transport supports multiaddresses of the form `/unix//tmp/foo`. //! -//! Example: -//! -//! ``` -//! extern crate libp2p_uds; -//! use libp2p_uds::UdsConfig; -//! -//! # fn main() { -//! let uds = UdsConfig::new(); -//! # } -//! ``` -//! //! The `UdsConfig` structs implements the `Transport` trait of the `core` library. See the //! documentation of `core` and of libp2p in general to learn how to use the `Transport` trait. #![cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))] +#![cfg_attr(docsrs, doc(cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))))] -use async_std::os::unix::net::{UnixListener, UnixStream}; use futures::{prelude::*, future::{BoxFuture, Ready}}; use futures::stream::BoxStream; use libp2p_core::{ @@ -55,20 +44,24 @@ use libp2p_core::{ use log::debug; use std::{io, path::PathBuf}; +macro_rules! codegen { + ($feature_name:expr, $uds_config:ident, $build_listener:expr, $unix_stream:ty, $($mut_or_not:tt)*) => { + /// Represents the configuration for a Unix domain sockets transport capability for libp2p. +#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] #[derive(Debug, Clone)] -pub struct UdsConfig { +pub struct $uds_config { } -impl UdsConfig { +impl $uds_config { /// Creates a new configuration object for Unix domain sockets. - pub fn new() -> UdsConfig { - UdsConfig {} + pub fn new() -> $uds_config { + $uds_config {} } } -impl Transport for UdsConfig { - type Output = UnixStream; +impl Transport for $uds_config { + type Output = $unix_stream; type Error = io::Error; type Listener = BoxStream<'static, Result, Self::Error>>; type ListenerUpgrade = Ready>; @@ -76,7 +69,7 @@ impl Transport for UdsConfig { fn listen_on(self, addr: Multiaddr) -> Result> { if let Ok(path) = multiaddr_to_path(&addr) { - Ok(async move { UnixListener::bind(&path).await } + Ok(async move { $build_listener(&path).await } .map_ok(move |listener| { stream::once({ let addr = addr.clone(); @@ -84,7 +77,7 @@ impl Transport for UdsConfig { debug!("Now listening on {}", addr); Ok(ListenerEvent::NewAddress(addr)) } - }).chain(stream::unfold(listener, move |listener| { + }).chain(stream::unfold(listener, move |$($mut_or_not)* listener| { let addr = addr.clone(); async move { let (stream, _) = match listener.accept().await { @@ -111,13 +104,32 @@ impl Transport for UdsConfig { fn dial(self, addr: Multiaddr) -> Result> { if let Ok(path) = multiaddr_to_path(&addr) { debug!("Dialing {}", addr); - Ok(async move { UnixStream::connect(&path).await }.boxed()) + Ok(async move { <$unix_stream>::connect(&path).await }.boxed()) } else { Err(TransportError::MultiaddrNotSupported(addr)) } } } +}; +} + +#[cfg(feature = "async-std")] +codegen!( + "async-std", + UdsConfig, + |addr| async move { async_std::os::unix::net::UnixListener::bind(addr).await }, + async_std::os::unix::net::UnixStream, +); +#[cfg(feature = "tokio")] +codegen!( + "tokio", + TokioUdsConfig, + |addr| async move { tokio::net::UnixListener::bind(addr) }, + tokio::net::UnixStream, + mut +); + /// Turns a `Multiaddr` containing a single `Unix` component into a path. /// /// Also returns an error if the path is not absolute, as we don't want to dial/listen on relative @@ -143,7 +155,7 @@ fn multiaddr_to_path(addr: &Multiaddr) -> Result { Ok(out) } -#[cfg(test)] +#[cfg(all(test, feature = "async-std"))] mod tests { use super::{multiaddr_to_path, UdsConfig}; use futures::{channel::oneshot, prelude::*};