TCP transport: change listen address selection. (#1132)

* Map all 127.0.0.0/8 addresses to 127.0.0.1.

Since every local socket address in the 127.0.0.0/8 space is looped back
to 127.0.0.1/32 we should only have to report the later as the listen
address. For other addresses we still attempt to discover host addresses
when we encounter an unknown local address. We now also check that after
the host addresses have been reset that the address is now found,
otherwise we produce an error.

* Change listen address lookup.

Perform multiple steps:

1. Check for exact address match.
2. Else consider netmask and check for containment.
3. Else re-check host addresses and try 1 & 2 again.
4. Else report an error.

* Small fixes.

* Test and improve prefix_len.

* Simplify and inline the prefix_len logic.
This commit is contained in:
Toralf Wittner 2019-06-03 17:55:15 +02:00 committed by Pierre Krieger
parent fbc6ea5c5e
commit 7394d608a3
2 changed files with 94 additions and 61 deletions

View File

@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
bytes = "0.4"
get_if_addrs = "0.5.3"
ipnet = "2.0.0"
libp2p-core = { version = "0.8.0", path = "../../core" }
log = "0.4.1"
futures = "0.1"

View File

@ -43,7 +43,8 @@ use futures::{
prelude::*,
stream::{self, Chain, IterOk, Once}
};
use get_if_addrs::get_if_addrs;
use get_if_addrs::{IfAddr, get_if_addrs};
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
@ -51,7 +52,7 @@ use libp2p_core::{
};
use log::{debug, error, trace};
use std::{
collections::{HashMap, VecDeque},
collections::VecDeque,
fmt,
io::{self, Read, Write},
iter::{self, FromIterator},
@ -152,10 +153,10 @@ impl Transport for TcpConfig {
let addrs =
if socket_addr.ip().is_unspecified() {
let addrs = host_addresses(port).map_err(TransportError::Other)?;
debug!("Listening on {:?}", addrs.values());
debug!("Listening on {:?}", addrs.iter().map(|(_, _, ma)| ma).collect::<Vec<_>>());
Addresses::Many(addrs)
} else {
let ma = sockaddr_to_multiaddr(local_addr.ip(), port);
let ma = ip_to_multiaddr(local_addr.ip(), port);
debug!("Listening on {:?}", ma);
Addresses::One(ma)
};
@ -167,7 +168,8 @@ impl Transport for TcpConfig {
Either::A(stream::once(Ok(event)))
}
Addresses::Many(ref aa) => {
let events = aa.values()
let events = aa.iter()
.map(|(_, _, ma)| ma)
.cloned()
.map(ListenerEvent::NewAddress)
.collect::<Vec<_>>();
@ -232,7 +234,7 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
}
// Create a [`Multiaddr`] from the given IP address and port number.
fn sockaddr_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
let proto = match ip {
IpAddr::V4(ip) => Protocol::Ip4(ip),
IpAddr::V6(ip) => Protocol::Ip6(ip)
@ -242,11 +244,26 @@ fn sockaddr_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
}
// Collect all local host addresses and use the provided port number as listen port.
fn host_addresses(port: u16) -> io::Result<HashMap<IpAddr, Multiaddr>> {
let mut addrs = HashMap::new();
fn host_addresses(port: u16) -> io::Result<Vec<(IpAddr, IpNet, Multiaddr)>> {
let mut addrs = Vec::new();
for iface in get_if_addrs()? {
let ma = sockaddr_to_multiaddr(iface.ip(), port);
addrs.insert(iface.ip(), ma);
let ip = iface.ip();
let ma = ip_to_multiaddr(ip, port);
let ipn = match iface.addr {
IfAddr::V4(ip4) => {
let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros();
let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8)
.expect("prefix_len is the number of bits in a u32, so can not exceed 32");
IpNet::V4(ipnet)
}
IfAddr::V6(ip6) => {
let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros();
let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8)
.expect("prefix_len is the number of bits in a u128, so can not exceed 128");
IpNet::V6(ipnet)
}
};
addrs.push((ip, ipn, ma))
}
Ok(addrs)
}
@ -331,9 +348,11 @@ enum Addresses {
/// A specific address is used to listen.
One(Multiaddr),
/// A set of addresses is used to listen.
Many(HashMap<IpAddr, Multiaddr>)
Many(Vec<(IpAddr, IpNet, Multiaddr)>)
}
type Buffer = VecDeque<ListenerEvent<FutureResult<TcpTransStream, io::Error>>>;
/// Stream that listens on an TCP/IP address.
pub struct TcpListenStream {
/// Stream of incoming sockets.
@ -343,11 +362,67 @@ pub struct TcpListenStream {
/// The set of known addresses.
addrs: Addresses,
/// Temporary buffer of listener events.
pending: VecDeque<ListenerEvent<FutureResult<TcpTransStream, io::Error>>>,
pending: Buffer,
/// Original configuration.
config: TcpConfig
}
// Map a `SocketAddr` to the corresponding `Multiaddr`.
// If not found, check for host address changes.
// This is a function rather than a method due to borrowing issues.
fn map_addr(addr: &SocketAddr, addrs: &mut Addresses, pending: &mut Buffer, port: u16)
-> Result<Multiaddr, io::Error>
{
match addrs {
Addresses::One(ref ma) => Ok(ma.clone()),
Addresses::Many(ref mut addrs) => {
// Check for exact match:
if let Some((_, _, ma)) = addrs.iter().find(|(i, ..)| i == &addr.ip()) {
return Ok(ma.clone())
}
// No exact match => check netmask
if let Some((_, _, ma)) = addrs.iter().find(|(_, i, _)| i.contains(&addr.ip())) {
return Ok(ma.clone())
}
// The local IP address of this socket is new to us.
// We need to check for changes in the set of host addresses and report new
// and expired addresses.
//
// TODO: We do not detect expired addresses unless there is a new address.
let new_addrs = host_addresses(port)?;
let old_addrs = std::mem::replace(addrs, new_addrs);
// Check for addresses no longer in use.
for (ip, _, ma) in old_addrs.iter() {
if addrs.iter().find(|(i, ..)| i == ip).is_none() {
debug!("Expired listen address: {}", ma);
pending.push_back(ListenerEvent::AddressExpired(ma.clone()));
}
}
// Check for new addresses.
for (ip, _, ma) in addrs.iter() {
if old_addrs.iter().find(|(i, ..)| i == ip).is_none() {
debug!("New listen address: {}", ma);
pending.push_back(ListenerEvent::NewAddress(ma.clone()));
}
}
// We should now be able to find the listen address of the local socket address,
// if not something is seriously wrong and we report an error.
if addrs.iter().find(|(i, j, _)| i == &addr.ip() || j.contains(&addr.ip())).is_none() {
let msg = format!("{} does not match any listen address", addr.ip());
return Err(io::Error::new(io::ErrorKind::Other, msg))
}
Ok(ip_to_multiaddr(addr.ip(), port))
}
}
}
impl Stream for TcpListenStream {
type Item = ListenerEvent<FutureResult<TcpTransStream, io::Error>>;
type Error = io::Error;
@ -379,46 +454,14 @@ impl Stream for TcpListenStream {
};
let listen_addr = match sock.local_addr() {
Ok(addr) => match self.addrs {
Addresses::One(ref ma) => ma.clone(),
Addresses::Many(ref mut addrs) => if let Some(ma) = addrs.get(&addr.ip()) {
ma.clone()
} else {
// The local IP address of this socket is new to us.
// We need to check for changes in the set of host addresses and report
// new and expired addresses.
//
// TODO: We do not detect expired addresses unless there is a new address.
let new_addrs = host_addresses(self.port)?;
let old_addrs = std::mem::replace(addrs, new_addrs);
// Check for addresses no longer in use.
for (ip, ma) in &old_addrs {
if !addrs.contains_key(&ip) {
debug!("Expired listen address: {}", ma);
self.pending.push_back(ListenerEvent::AddressExpired(ma.clone()));
}
}
// Check for new addresses.
for (ip, ma) in addrs {
if !old_addrs.contains_key(&ip) {
debug!("New listen address: {}", ma);
self.pending.push_back(ListenerEvent::NewAddress(ma.clone()));
}
}
sockaddr_to_multiaddr(addr.ip(), self.port)
}
}
Ok(addr) => map_addr(&addr, &mut self.addrs, &mut self.pending, self.port)?,
Err(err) => {
error!("Failed to get local address of incoming socket: {:?}", err);
return Err(err)
}
};
let remote_addr = sockaddr_to_multiaddr(sock_addr.ip(), sock_addr.port());
let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port());
match apply_config(&self.config, &sock) {
Ok(()) => {
@ -459,7 +502,6 @@ pub struct TcpTransStream {
}
impl Read for TcpTransStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
self.inner.read(buf)
}
@ -476,26 +518,22 @@ impl AsyncRead for TcpTransStream {
}
impl Write for TcpTransStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self) -> Result<(), io::Error> {
self.inner.flush()
}
}
impl AsyncWrite for TcpTransStream {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
}
impl Drop for TcpTransStream {
#[inline]
fn drop(&mut self) {
if let Ok(addr) = self.inner.peer_addr() {
debug!("Dropped TCP connection to {:?}", addr);
@ -507,17 +545,11 @@ impl Drop for TcpTransStream {
#[cfg(test)]
mod tests {
use tokio::runtime::current_thread::Runtime;
use super::{multiaddr_to_socketaddr, TcpConfig};
use futures::stream::Stream;
use futures::Future;
use std;
use futures::prelude::*;
use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use libp2p_core::{
Transport,
multiaddr::{Multiaddr, Protocol},
transport::ListenerEvent
};
use super::{multiaddr_to_socketaddr, TcpConfig};
use tokio::runtime::current_thread::Runtime;
use tokio_io;
#[test]