Support tokio in libp2p-tcp and libp2p-uds (#1402)

This commit is contained in:
Pierre Krieger
2020-01-24 16:40:48 +01:00
committed by GitHub
parent 37c7d73b11
commit 0a45f7310f
4 changed files with 250 additions and 192 deletions

View File

@ -22,21 +22,13 @@
//!
//! # Usage
//!
//! Example:
//! This crate provides two structs, `TcpConfig` and `TokioTcpConfig`, depending on which
//! features are enabled.
//!
//! ```
//! extern crate libp2p_tcp;
//! use libp2p_tcp::TcpConfig;
//!
//! # fn main() {
//! let tcp = TcpConfig::new();
//! # }
//! ```
//!
//! The `TcpConfig` structs implements the `Transport` trait of the `swarm` library. See the
//! documentation of `swarm` and of libp2p in general to learn how to use the `Transport` trait.
//! Both the `TcpConfig` and `TokioTcpConfig` structs implement the `Transport` trait of the
//! `core` library. See the documentation of `core` and of libp2p in general to learn how to
//! use the `Transport` trait.
use async_std::net::TcpStream;
use futures::{future::{self, Ready}, prelude::*};
use futures_timer::Delay;
use get_if_addrs::{IfAddr, get_if_addrs};
@ -57,12 +49,16 @@ use std::{
time::Duration
};
macro_rules! codegen {
($feature_name:expr, $tcp_config:ident, $tcp_trans_stream:ident, $tcp_listen_stream:ident, $apply_config:ident, $tcp_stream:ty, $tcp_listener:ty) => {
/// Represents the configuration for a TCP/IP transport capability for libp2p.
///
/// The TCP sockets created by libp2p will need to be progressed by running the futures and streams
/// obtained by libp2p through the tokio reactor.
#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))]
#[derive(Debug, Clone, Default)]
pub struct TcpConfig {
pub struct $tcp_config {
/// How long a listener should sleep after receiving an error, before trying again.
sleep_on_error: Duration,
/// TTL to set for opened sockets, or `None` to keep default.
@ -71,10 +67,10 @@ pub struct TcpConfig {
nodelay: Option<bool>,
}
impl TcpConfig {
impl $tcp_config {
/// Creates a new configuration object for TCP/IP.
pub fn new() -> TcpConfig {
TcpConfig {
pub fn new() -> $tcp_config {
$tcp_config {
sleep_on_error: Duration::from_millis(100),
ttl: None,
nodelay: None,
@ -94,12 +90,12 @@ impl TcpConfig {
}
}
impl Transport for TcpConfig {
type Output = TcpTransStream;
impl Transport for $tcp_config {
type Output = $tcp_trans_stream;
type Error = io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade>, io::Error>> + Send>>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = Pin<Box<dyn Future<Output = Result<TcpTransStream, io::Error>> + Send>>;
type Dial = Pin<Box<dyn Future<Output = Result<$tcp_trans_stream, io::Error>> + Send>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let socket_addr =
@ -109,10 +105,10 @@ impl Transport for TcpConfig {
return Err(TransportError::MultiaddrNotSupported(addr))
};
async fn do_listen(cfg: TcpConfig, socket_addr: SocketAddr)
-> Result<impl Stream<Item = Result<ListenerEvent<Ready<Result<TcpTransStream, io::Error>>>, io::Error>>, io::Error>
async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr)
-> Result<impl Stream<Item = Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>>, io::Error>>, io::Error>
{
let listener = async_std::net::TcpListener::bind(&socket_addr).await?;
let listener = <$tcp_listener>::bind(&socket_addr).await?;
let local_addr = listener.local_addr()?;
let port = local_addr.port();
@ -148,7 +144,7 @@ impl Transport for TcpConfig {
}
};
let listen_stream = TcpListenStream {
let listen_stream = $tcp_listen_stream {
stream: listener,
pause: None,
pause_duration: cfg.sleep_on_error,
@ -178,16 +174,187 @@ impl Transport for TcpConfig {
debug!("Dialing {}", addr);
async fn do_dial(cfg: TcpConfig, socket_addr: SocketAddr) -> Result<TcpTransStream, io::Error> {
let stream = TcpStream::connect(&socket_addr).await?;
apply_config(&cfg, &stream)?;
Ok(TcpTransStream { inner: stream })
async fn do_dial(cfg: $tcp_config, socket_addr: SocketAddr) -> Result<$tcp_trans_stream, io::Error> {
let stream = <$tcp_stream>::connect(&socket_addr).await?;
$apply_config(&cfg, &stream)?;
Ok($tcp_trans_stream { inner: stream })
}
Ok(Box::pin(do_dial(self, socket_addr)))
}
}
/// Stream that listens on an TCP/IP address.
#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))]
pub struct $tcp_listen_stream {
/// The incoming connections.
stream: $tcp_listener,
/// The current pause if any.
pause: Option<Delay>,
/// How long to pause after an error.
pause_duration: Duration,
/// The port which we use as our listen port in listener event addresses.
port: u16,
/// The set of known addresses.
addrs: Addresses,
/// Temporary buffer of listener events.
pending: Buffer<$tcp_trans_stream>,
/// Original configuration.
config: $tcp_config
}
impl $tcp_listen_stream {
/// Takes ownership of the listener, and returns the next incoming event and the listener.
async fn next(mut self) -> (Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>>, io::Error>, Self) {
loop {
if let Some(event) = self.pending.pop_front() {
return (event, self);
}
if let Some(pause) = self.pause.take() {
let _ = pause.await;
}
// TODO: do we get the peer_addr at the same time?
let (sock, _) = match self.stream.accept().await {
Ok(s) => s,
Err(e) => {
debug!("error accepting incoming connection: {}", e);
self.pause = Some(Delay::new(self.pause_duration));
return (Err(e), self);
}
};
let sock_addr = match sock.peer_addr() {
Ok(addr) => addr,
Err(err) => {
debug!("Failed to get peer address: {:?}", err);
continue
}
};
let local_addr = match sock.local_addr() {
Ok(sock_addr) => {
if let Addresses::Many(ref mut addrs) = self.addrs {
if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) {
return (Err(err), self);
}
}
ip_to_multiaddr(sock_addr.ip(), sock_addr.port())
}
Err(err) => {
debug!("Failed to get local address of incoming socket: {:?}", err);
continue
}
};
let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port());
match $apply_config(&self.config, &sock) {
Ok(()) => {
trace!("Incoming connection from {} at {}", remote_addr, local_addr);
self.pending.push_back(Ok(ListenerEvent::Upgrade {
upgrade: future::ok($tcp_trans_stream { inner: sock }),
local_addr,
remote_addr
}))
}
Err(err) => {
debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err);
self.pending.push_back(Ok(ListenerEvent::Upgrade {
upgrade: future::err(err),
local_addr,
remote_addr
}))
}
}
}
}
}
/// Wraps around a `TcpStream` and adds logging for important events.
#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))]
#[derive(Debug)]
pub struct $tcp_trans_stream {
inner: $tcp_stream,
}
impl Drop for $tcp_trans_stream {
fn drop(&mut self) {
if let Ok(addr) = self.inner.peer_addr() {
debug!("Dropped TCP connection to {:?}", addr);
} else {
debug!("Dropped TCP connection to undeterminate peer");
}
}
}
/// Applies the socket configuration parameters to a socket.
fn $apply_config(config: &$tcp_config, socket: &$tcp_stream) -> Result<(), io::Error> {
if let Some(ttl) = config.ttl {
socket.set_ttl(ttl)?;
}
if let Some(nodelay) = config.nodelay {
socket.set_nodelay(nodelay)?;
}
Ok(())
}
};
}
#[cfg(feature = "async-std")]
codegen!("async-std", TcpConfig, TcpTransStream, TcpListenStream, apply_config_async_std, async_std::net::TcpStream, async_std::net::TcpListener);
#[cfg(feature = "tokio")]
codegen!("tokio", TokioTcpConfig, TokioTcpTransStream, TokioTcpListenStream, apply_config_tokio, tokio::net::TcpStream, tokio::net::TcpListener);
#[cfg(feature = "async-std")]
impl AsyncRead for TcpTransStream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf)
}
}
#[cfg(feature = "async-std")]
impl AsyncWrite for TcpTransStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_close(Pin::new(&mut self.inner), cx)
}
}
#[cfg(feature = "tokio")]
impl AsyncRead for TokioTcpTransStream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf)
}
}
#[cfg(feature = "tokio")]
impl AsyncWrite for TokioTcpTransStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.inner), cx)
}
}
// This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
let mut iter = addr.iter();
@ -240,19 +407,6 @@ fn host_addresses(port: u16) -> io::Result<Vec<(IpAddr, IpNet, Multiaddr)>> {
Ok(addrs)
}
/// Applies the socket configuration parameters to a socket.
fn apply_config(config: &TcpConfig, socket: &TcpStream) -> Result<(), io::Error> {
if let Some(ttl) = config.ttl {
socket.set_ttl(ttl)?;
}
if let Some(nodelay) = config.nodelay {
socket.set_nodelay(nodelay)?;
}
Ok(())
}
/// Listen address information.
#[derive(Debug)]
enum Addresses {
@ -262,34 +416,16 @@ enum Addresses {
Many(Vec<(IpAddr, IpNet, Multiaddr)>)
}
type Buffer = VecDeque<Result<ListenerEvent<Ready<Result<TcpTransStream, io::Error>>>, io::Error>>;
/// Stream that listens on an TCP/IP address.
pub struct TcpListenStream {
/// The incoming connections.
stream: async_std::net::TcpListener,
/// The current pause if any.
pause: Option<Delay>,
/// How long to pause after an error.
pause_duration: Duration,
/// The port which we use as our listen port in listener event addresses.
port: u16,
/// The set of known addresses.
addrs: Addresses,
/// Temporary buffer of listener events.
pending: Buffer,
/// Original configuration.
config: TcpConfig
}
type Buffer<T> = VecDeque<Result<ListenerEvent<Ready<Result<T, io::Error>>>, io::Error>>;
// If we listen on all interfaces, find out to which interface the given
// socket address belongs. In case we think the address is new, check
// all host interfaces again and report new and expired listen addresses.
fn check_for_interface_changes(
fn check_for_interface_changes<T>(
socket_addr: &SocketAddr,
listen_port: u16,
listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>,
pending: &mut Buffer
pending: &mut Buffer<T>
) -> Result<(), io::Error> {
// Check for exact match:
if listen_addrs.iter().find(|(ip, ..)| ip == &socket_addr.ip()).is_some() {
@ -337,119 +473,17 @@ fn check_for_interface_changes(
Ok(())
}
impl TcpListenStream {
/// Takes ownership of the listener, and returns the next incoming event and the listener.
async fn next(mut self) -> (Result<ListenerEvent<Ready<Result<TcpTransStream, io::Error>>>, io::Error>, Self) {
loop {
if let Some(event) = self.pending.pop_front() {
return (event, self);
}
if let Some(pause) = self.pause.take() {
let _ = pause.await;
}
// TODO: do we get the peer_addr at the same time?
let (sock, _) = match self.stream.accept().await {
Ok(s) => s,
Err(e) => {
debug!("error accepting incoming connection: {}", e);
self.pause = Some(Delay::new(self.pause_duration));
return (Err(e), self);
}
};
let sock_addr = match sock.peer_addr() {
Ok(addr) => addr,
Err(err) => {
debug!("Failed to get peer address: {:?}", err);
continue
}
};
let local_addr = match sock.local_addr() {
Ok(sock_addr) => {
if let Addresses::Many(ref mut addrs) = self.addrs {
if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) {
return (Err(err), self);
}
}
ip_to_multiaddr(sock_addr.ip(), sock_addr.port())
}
Err(err) => {
debug!("Failed to get local address of incoming socket: {:?}", err);
continue
}
};
let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port());
match apply_config(&self.config, &sock) {
Ok(()) => {
trace!("Incoming connection from {} at {}", remote_addr, local_addr);
self.pending.push_back(Ok(ListenerEvent::Upgrade {
upgrade: future::ok(TcpTransStream { inner: sock }),
local_addr,
remote_addr
}))
}
Err(err) => {
debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err);
self.pending.push_back(Ok(ListenerEvent::Upgrade {
upgrade: future::err(err),
local_addr,
remote_addr
}))
}
}
}
}
}
/// Wraps around a `TcpStream` and adds logging for important events.
#[derive(Debug)]
pub struct TcpTransStream {
inner: TcpStream,
}
impl AsyncRead for TcpTransStream {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf)
}
}
impl AsyncWrite for TcpTransStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
AsyncWrite::poll_close(Pin::new(&mut self.inner), cx)
}
}
impl Drop for TcpTransStream {
fn drop(&mut self) {
if let Ok(addr) = self.inner.peer_addr() {
debug!("Dropped TCP connection to {:?}", addr);
} else {
debug!("Dropped TCP connection to undeterminate peer");
}
}
}
#[cfg(test)]
mod tests {
use futures::prelude::*;
use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use super::{multiaddr_to_socketaddr, TcpConfig};
use super::multiaddr_to_socketaddr;
#[cfg(feature = "async-std")]
use super::TcpConfig;
#[test]
#[cfg(feature = "async-std")]
fn wildcard_expansion() {
let mut listener = TcpConfig::new()
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
@ -539,6 +573,7 @@ mod tests {
}
#[test]
#[cfg(feature = "async-std")]
fn communicating_between_dialer_and_listener() {
let (ready_tx, ready_rx) = futures::channel::oneshot::channel();
let mut ready_tx = Some(ready_tx);
@ -580,6 +615,7 @@ mod tests {
}
#[test]
#[cfg(feature = "async-std")]
fn replace_port_0_in_returned_multiaddr_ipv4() {
let tcp = TcpConfig::new();
@ -597,6 +633,7 @@ mod tests {
}
#[test]
#[cfg(feature = "async-std")]
fn replace_port_0_in_returned_multiaddr_ipv6() {
let tcp = TcpConfig::new();
@ -614,6 +651,7 @@ mod tests {
}
#[test]
#[cfg(feature = "async-std")]
fn larger_addr_denied() {
let tcp = TcpConfig::new();