Expand wildcard IP addresses in TCP transport. (#1044)

Wildcard IP addresses (e.g. 0.0.0.0) are used to listen on all host
interfaces. To report those addresses such that clients know about them
and can actually make use of them we use the `get_if_addrs` crate and
maintain a collection of addresses. We report the whole expansion at the
very beginning of the listener stream with `ListenerEvent::NewAddress`
events and add new addresses should they come to our attention.

What remains to be done is to potentially allow users to filter IP
addresses, for example the local loopback one, and to detect expired
addresses not only if a new address is discovered.
This commit is contained in:
Toralf Wittner 2019-04-11 22:51:07 +02:00 committed by GitHub
parent 6e0a38bb4a
commit 05a74aed43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 294 additions and 109 deletions

View File

@ -288,6 +288,15 @@ impl<T> ListenerEvent<T> {
} }
} }
/// Returns `true` if this is an `Upgrade` listener event.
pub fn is_upgrade(&self) -> bool {
if let ListenerEvent::Upgrade {..} = self {
true
} else {
false
}
}
/// Try to turn this listener event into upgrade parts. /// Try to turn this listener event into upgrade parts.
/// ///
/// Returns `None` if the event is not actually an upgrade, /// Returns `None` if the event is not actually an upgrade,
@ -300,6 +309,15 @@ impl<T> ListenerEvent<T> {
} }
} }
/// Returns `true` if this is a `NewAddress` listener event.
pub fn is_new_address(&self) -> bool {
if let ListenerEvent::NewAddress(_) = self {
true
} else {
false
}
}
/// Try to turn this listener event into the `NewAddress` part. /// Try to turn this listener event into the `NewAddress` part.
/// ///
/// Returns `None` if the event is not actually a `NewAddress`, /// Returns `None` if the event is not actually a `NewAddress`,
@ -312,6 +330,15 @@ impl<T> ListenerEvent<T> {
} }
} }
/// Returns `true` if this is an `AddressExpired` listener event.
pub fn is_address_expired(&self) -> bool {
if let ListenerEvent::AddressExpired(_) = self {
true
} else {
false
}
}
/// Try to turn this listener event into the `AddressExpired` part. /// Try to turn this listener event into the `AddressExpired` part.
/// ///
/// Returns `None` if the event is not actually a `AddressExpired`, /// Returns `None` if the event is not actually a `AddressExpired`,

View File

@ -78,7 +78,7 @@ fn client_to_server_outbound() {
let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
let future = transport let future = transport
.dial(rx.recv().unwrap().clone()) .dial(rx.recv().unwrap())
.unwrap() .unwrap()
.map_err(|err| panic!("{:?}", err)) .map_err(|err| panic!("{:?}", err))
.and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client)))
@ -142,7 +142,7 @@ fn client_to_server_inbound() {
let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
let future = transport let future = transport
.dial(rx.recv().unwrap().clone()) .dial(rx.recv().unwrap())
.unwrap() .unwrap()
.map_err(|err| panic!("{:?}", err)) .map_err(|err| panic!("{:?}", err))
.and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client)))

View File

@ -335,7 +335,7 @@ mod tests {
let transport = TcpConfig::new(); let transport = TcpConfig::new();
let future = transport.dial(rx.recv().unwrap().clone()) let future = transport.dial(rx.recv().unwrap())
.unwrap() .unwrap()
.and_then(|socket| { .and_then(|socket| {
apply_outbound(socket, IdentifyProtocolConfig) apply_outbound(socket, IdentifyProtocolConfig)

View File

@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
get_if_addrs = "0.5.3"
libp2p-core = { version = "0.6.0", path = "../../core" } libp2p-core = { version = "0.6.0", path = "../../core" }
log = "0.4.1" log = "0.4.1"
futures = "0.1" futures = "0.1"

View File

@ -38,17 +38,27 @@
//! The `TcpConfig` structs implements the `Transport` trait of the `swarm` library. See the //! The `TcpConfig` structs implements the `Transport` trait of the `swarm` library. See the
//! documentation of `swarm` and of libp2p in general to learn how to use the `Transport` trait. //! documentation of `swarm` and of libp2p in general to learn how to use the `Transport` trait.
use futures::{future, future::FutureResult, prelude::*, Async, Poll}; use futures::{
future::{self, Either, FutureResult},
prelude::*,
stream::{self, Chain, IterOk, Once}
};
use get_if_addrs::get_if_addrs;
use libp2p_core::{Transport, transport::{ListenerEvent, TransportError}}; use libp2p_core::{Transport, transport::{ListenerEvent, TransportError}};
use log::{debug, error}; use log::{debug, error};
use multiaddr::{Protocol, Multiaddr, ToMultiaddr}; use multiaddr::{Protocol, Multiaddr};
use std::fmt; use std::{
use std::io::{self, Read, Write}; collections::{HashMap, VecDeque},
use std::net::SocketAddr; fmt,
use std::time::Duration; io::{self, Read, Write},
iter::{self, FromIterator},
net::{IpAddr, SocketAddr},
time::Duration,
vec::IntoIter
};
use tk_listen::{ListenExt, SleepOnError}; use tk_listen::{ListenExt, SleepOnError};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; use tokio_tcp::{ConnectFuture, Incoming, TcpStream};
/// Represents the configuration for a TCP/IP transport capability for libp2p. /// Represents the configuration for a TCP/IP transport capability for libp2p.
/// ///
@ -72,7 +82,6 @@ pub struct TcpConfig {
impl TcpConfig { impl TcpConfig {
/// Creates a new configuration object for TCP/IP. /// Creates a new configuration object for TCP/IP.
#[inline]
pub fn new() -> TcpConfig { pub fn new() -> TcpConfig {
TcpConfig { TcpConfig {
sleep_on_error: Duration::from_millis(100), sleep_on_error: Duration::from_millis(100),
@ -85,35 +94,30 @@ impl TcpConfig {
} }
/// Sets the size of the recv buffer size to set for opened sockets. /// Sets the size of the recv buffer size to set for opened sockets.
#[inline]
pub fn recv_buffer_size(mut self, value: usize) -> Self { pub fn recv_buffer_size(mut self, value: usize) -> Self {
self.recv_buffer_size = Some(value); self.recv_buffer_size = Some(value);
self self
} }
/// Sets the size of the send buffer size to set for opened sockets. /// Sets the size of the send buffer size to set for opened sockets.
#[inline]
pub fn send_buffer_size(mut self, value: usize) -> Self { pub fn send_buffer_size(mut self, value: usize) -> Self {
self.send_buffer_size = Some(value); self.send_buffer_size = Some(value);
self self
} }
/// Sets the TTL to set for opened sockets. /// Sets the TTL to set for opened sockets.
#[inline]
pub fn ttl(mut self, value: u32) -> Self { pub fn ttl(mut self, value: u32) -> Self {
self.ttl = Some(value); self.ttl = Some(value);
self self
} }
/// Sets the keep alive pinging duration to set for opened sockets. /// Sets the keep alive pinging duration to set for opened sockets.
#[inline]
pub fn keepalive(mut self, value: Option<Duration>) -> Self { pub fn keepalive(mut self, value: Option<Duration>) -> Self {
self.keepalive = Some(value); self.keepalive = Some(value);
self self
} }
/// Sets the `TCP_NODELAY` to set for opened sockets. /// Sets the `TCP_NODELAY` to set for opened sockets.
#[inline]
pub fn nodelay(mut self, value: bool) -> Self { pub fn nodelay(mut self, value: bool) -> Self {
self.nodelay = Some(value); self.nodelay = Some(value);
self self
@ -123,60 +127,87 @@ impl TcpConfig {
impl Transport for TcpConfig { impl Transport for TcpConfig {
type Output = TcpTransStream; type Output = TcpTransStream;
type Error = io::Error; type Error = io::Error;
type Listener = TcpListenStream; type Listener = TcpListener;
type ListenerUpgrade = FutureResult<Self::Output, io::Error>; type ListenerUpgrade = FutureResult<Self::Output, Self::Error>;
type Dial = TcpDialFut; type Dial = TcpDialFut;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> { fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { let socket_addr =
let listener = TcpListener::bind(&socket_addr); if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
// We need to build the `Multiaddr` to return from this function. If an error happened, sa
// just return the original multiaddr. } else {
let listen_addr = match listener { return Err(TransportError::MultiaddrNotSupported(addr))
Ok(ref l) => if let Ok(new_s_addr) = l.local_addr() {
new_s_addr.to_multiaddr()
.expect("multiaddr generated from socket addr is always valid")
} else {
addr
},
Err(_) => addr,
}; };
debug!("Now listening on {}", listen_addr); let listener = tokio_tcp::TcpListener::bind(&socket_addr).map_err(TransportError::Other)?;
let sleep_on_error = self.sleep_on_error; let local_addr = listener.local_addr().map_err(TransportError::Other)?;
let inner = listener let port = local_addr.port();
.map_err(TransportError::Other)?
.incoming()
.sleep_on_error(sleep_on_error);
Ok(TcpListenStream { // Determine all our listen addresses which is either a single local IP address
inner: Ok(inner), // or (if a wildcard IP address was used) the addresses of all our interfaces,
listen_addr, // as reported by `get_if_addrs`.
config: self, let addrs =
tell_new_addr: true if socket_addr.ip().is_unspecified() {
}) let addrs = host_addresses(port).map_err(TransportError::Other)?;
} else { debug!("Listening on {:?}", addrs.values());
Err(TransportError::MultiaddrNotSupported(addr)) Addresses::Many(addrs)
} } else {
let ma = sockaddr_to_multiaddr(local_addr.ip(), port);
debug!("Listening on {:?}", ma);
Addresses::One(ma)
};
// Generate `NewAddress` events for each new `Multiaddr`.
let events = match addrs {
Addresses::One(ref ma) => {
let event = ListenerEvent::NewAddress(ma.clone());
Either::A(stream::once(Ok(event)))
}
Addresses::Many(ref aa) => {
let events = aa.values()
.cloned()
.map(ListenerEvent::NewAddress)
.collect::<Vec<_>>();
Either::B(stream::iter_ok(events))
}
};
let stream = TcpListenStream {
inner: Ok(listener.incoming().sleep_on_error(self.sleep_on_error)),
port,
addrs,
pending: VecDeque::new(),
config: self
};
Ok(TcpListener {
inner: match events {
Either::A(e) => Either::A(e.chain(stream)),
Either::B(e) => Either::B(e.chain(stream))
}
})
} }
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { let socket_addr =
// As an optimization, we check that the address is not of the form `0.0.0.0`. if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
// If so, we instantly refuse dialing instead of going through the kernel. if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() { debug!("Instantly refusing dialing {}, as it is invalid", addr);
debug!("Dialing {}", addr); return Err(TransportError::Other(io::ErrorKind::ConnectionRefused.into()))
Ok(TcpDialFut { }
inner: TcpStream::connect(&socket_addr), socket_addr
config: self,
})
} else { } else {
debug!("Instantly refusing dialing {}, as it is invalid", addr); return Err(TransportError::MultiaddrNotSupported(addr))
Err(TransportError::Other(io::ErrorKind::ConnectionRefused.into())) };
}
} else { debug!("Dialing {}", addr);
Err(TransportError::MultiaddrNotSupported(addr))
} let future = TcpDialFut {
inner: TcpStream::connect(&socket_addr),
config: self
};
Ok(future)
} }
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> { fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
@ -217,6 +248,26 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
} }
} }
// Create a [`Multiaddr`] from the given IP address and port number.
fn sockaddr_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
let proto = match ip {
IpAddr::V4(ip) => Protocol::Ip4(ip),
IpAddr::V6(ip) => Protocol::Ip6(ip)
};
let it = iter::once(proto).chain(iter::once(Protocol::Tcp(port)));
Multiaddr::from_iter(it)
}
// Collect all local host addresses and use the provided port number as listen port.
fn host_addresses(port: u16) -> io::Result<HashMap<IpAddr, Multiaddr>> {
let mut addrs = HashMap::new();
for iface in get_if_addrs()? {
let ma = sockaddr_to_multiaddr(iface.ip(), port);
addrs.insert(iface.ip(), ma);
}
Ok(addrs)
}
/// Applies the socket configuration parameters to a socket. /// Applies the socket configuration parameters to a socket.
fn apply_config(config: &TcpConfig, socket: &TcpStream) -> Result<(), io::Error> { fn apply_config(config: &TcpConfig, socket: &TcpStream) -> Result<(), io::Error> {
if let Some(recv_buffer_size) = config.recv_buffer_size { if let Some(recv_buffer_size) = config.recv_buffer_size {
@ -270,14 +321,48 @@ impl Future for TcpDialFut {
} }
} }
/// Stream of `ListenerEvent`s.
#[derive(Debug)]
pub struct TcpListener {
inner: Either<
Chain<Once<ListenerEvent<FutureResult<TcpTransStream, io::Error>>, io::Error>, TcpListenStream>,
Chain<IterOk<IntoIter<ListenerEvent<FutureResult<TcpTransStream, io::Error>>>, io::Error>, TcpListenStream>
>
}
impl Stream for TcpListener {
type Item = ListenerEvent<FutureResult<TcpTransStream, io::Error>>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.inner {
Either::A(ref mut it) => it.poll(),
Either::B(ref mut it) => it.poll()
}
}
}
/// Listen address information.
#[derive(Debug)]
enum Addresses {
/// A specific address is used to listen.
One(Multiaddr),
/// A set of addresses is used to listen.
Many(HashMap<IpAddr, Multiaddr>)
}
/// Stream that listens on an TCP/IP address. /// Stream that listens on an TCP/IP address.
pub struct TcpListenStream { pub struct TcpListenStream {
/// Stream of incoming sockets.
inner: Result<SleepOnError<Incoming>, Option<io::Error>>, inner: Result<SleepOnError<Incoming>, Option<io::Error>>,
listen_addr: Multiaddr, /// The port which we use as our listen port in listener event addresses.
port: u16,
/// The set of known addresses.
addrs: Addresses,
/// Temporary buffer of listener events.
pending: VecDeque<ListenerEvent<FutureResult<TcpTransStream, io::Error>>>,
/// Original configuration. /// Original configuration.
config: TcpConfig, config: TcpConfig
/// Generate `ListenerEvent::NewAddress` to inform about our listen address.
tell_new_addr: bool
} }
impl Stream for TcpListenStream { impl Stream for TcpListenStream {
@ -285,58 +370,89 @@ impl Stream for TcpListenStream {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
if self.tell_new_addr {
self.tell_new_addr = false;
return Ok(Async::Ready(Some(ListenerEvent::NewAddress(self.listen_addr.clone()))))
}
let inner = match self.inner { let inner = match self.inner {
Ok(ref mut inc) => inc, Ok(ref mut inc) => inc,
Err(ref mut err) => { Err(ref mut err) => return Err(err.take().expect("poll called again after error"))
return Err(err.take().expect("poll called again after error"));
}
}; };
loop { loop {
match inner.poll() { if let Some(event) = self.pending.pop_front() {
Ok(Async::Ready(Some(sock))) => { return Ok(Async::Ready(Some(event)))
let addr = match sock.peer_addr() { }
// TODO: remove this expect()
Ok(addr) => addr
.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"),
Err(err) => {
// If we can't get the address of the newly-opened socket, there's
// nothing we can do except ignore this connection attempt.
error!("Ignored incoming because could't determine its \
address: {:?}", err);
continue
},
};
match apply_config(&self.config, &sock) { let sock = match inner.poll() {
Ok(()) => (), Ok(Async::Ready(Some(sock))) => sock,
Err(err) => { Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
let event = ListenerEvent::Upgrade { Ok(Async::NotReady) => return Ok(Async::NotReady),
upgrade: future::err(err), Err(()) => unreachable!("sleep_on_error never produces an error")
listen_addr: self.listen_addr.clone(), };
remote_addr: addr
};
return Ok(Async::Ready(Some(event)))
}
};
debug!("Incoming connection from {}", addr); let sock_addr = match sock.peer_addr() {
let event = ListenerEvent::Upgrade { Ok(addr) => addr,
upgrade: future::ok(TcpTransStream { inner: sock }), Err(err) => {
listen_addr: self.listen_addr.clone(), error!("Failed to get peer address: {:?}", err);
remote_addr: addr return Err(err)
}; }
break Ok(Async::Ready(Some(event))) };
let listen_addr = match sock.local_addr() {
Ok(addr) => match self.addrs {
Addresses::One(ref ma) => ma.clone(),
Addresses::Many(ref mut addrs) => if let Some(ma) = addrs.get(&addr.ip()) {
ma.clone()
} else {
// The local IP address of this socket is new to us.
// We need to check for changes in the set of host addresses and report
// new and expired addresses.
//
// TODO: We do not detect expired addresses unless there is a new address.
let new_addrs = host_addresses(self.port)?;
let old_addrs = std::mem::replace(addrs, new_addrs);
// Check for addresses no longer in use.
for (ip, ma) in &old_addrs {
if !addrs.contains_key(&ip) {
debug!("Expired listen address: {}", ma);
self.pending.push_back(ListenerEvent::AddressExpired(ma.clone()));
}
}
// Check for new addresses.
for (ip, ma) in addrs {
if !old_addrs.contains_key(&ip) {
debug!("New listen address: {}", ma);
self.pending.push_back(ListenerEvent::NewAddress(ma.clone()));
}
}
sockaddr_to_multiaddr(addr.ip(), self.port)
}
}
Err(err) => {
error!("Failed to get local address of incoming socket: {:?}", err);
return Err(err)
}
};
let remote_addr = sockaddr_to_multiaddr(sock_addr.ip(), sock_addr.port());
match apply_config(&self.config, &sock) {
Ok(()) => {
debug!("Incoming connection from {}", remote_addr);
self.pending.push_back(ListenerEvent::Upgrade {
upgrade: future::ok(TcpTransStream { inner: sock }),
listen_addr,
remote_addr
})
}
Err(err) => {
self.pending.push_back(ListenerEvent::Upgrade {
upgrade: future::err(err),
listen_addr,
remote_addr
})
} }
Ok(Async::Ready(None)) => break Ok(Async::Ready(None)),
Ok(Async::NotReady) => break Ok(Async::NotReady),
Err(()) => unreachable!("sleep_on_error never produces an error"),
} }
} }
} }
@ -403,12 +519,53 @@ mod tests {
use super::{multiaddr_to_socketaddr, TcpConfig}; use super::{multiaddr_to_socketaddr, TcpConfig};
use futures::stream::Stream; use futures::stream::Stream;
use futures::Future; use futures::Future;
use multiaddr::Multiaddr; use multiaddr::{Multiaddr, Protocol};
use std; use std;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use libp2p_core::{Transport, transport::ListenerEvent}; use libp2p_core::{Transport, transport::ListenerEvent};
use tokio_io; use tokio_io;
#[test]
fn wildcard_expansion() {
let mut listener = TcpConfig::new()
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.expect("listener");
// Get the first address.
let addr = listener.by_ref()
.wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
// Process all initial `NewAddress` events and make sure they
// do not contain wildcard address or port.
let server = listener
.take_while(|event| match event {
ListenerEvent::NewAddress(a) => {
let mut iter = a.iter();
match iter.next().expect("ip address") {
Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
other => panic!("Unexpected protocol: {}", other)
}
if let Protocol::Tcp(port) = iter.next().expect("port") {
assert_ne!(0, port)
} else {
panic!("No TCP port in address: {}", a)
}
Ok(true)
}
_ => Ok(false)
})
.for_each(|_| Ok(()));
let client = TcpConfig::new().dial(addr).expect("dialer");
tokio::run(server.join(client).map(|_| ()).map_err(|e| panic!("error: {}", e)))
}
#[test] #[test]
fn multiaddr_to_tcp_conversion() { fn multiaddr_to_tcp_conversion() {
use std::net::Ipv6Addr; use std::net::Ipv6Addr;