diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 836798a1..c39fb524 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] bytes = "0.4" get_if_addrs = "0.5.3" +ipnet = "2.0.0" libp2p-core = { version = "0.8.0", path = "../../core" } log = "0.4.1" futures = "0.1" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 5edfb442..d0475ae7 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -43,7 +43,8 @@ use futures::{ prelude::*, stream::{self, Chain, IterOk, Once} }; -use get_if_addrs::get_if_addrs; +use get_if_addrs::{IfAddr, get_if_addrs}; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use libp2p_core::{ Transport, multiaddr::{Protocol, Multiaddr}, @@ -51,7 +52,7 @@ use libp2p_core::{ }; use log::{debug, error, trace}; use std::{ - collections::{HashMap, VecDeque}, + collections::VecDeque, fmt, io::{self, Read, Write}, iter::{self, FromIterator}, @@ -152,10 +153,10 @@ impl Transport for TcpConfig { let addrs = if socket_addr.ip().is_unspecified() { let addrs = host_addresses(port).map_err(TransportError::Other)?; - debug!("Listening on {:?}", addrs.values()); + debug!("Listening on {:?}", addrs.iter().map(|(_, _, ma)| ma).collect::>()); Addresses::Many(addrs) } else { - let ma = sockaddr_to_multiaddr(local_addr.ip(), port); + let ma = ip_to_multiaddr(local_addr.ip(), port); debug!("Listening on {:?}", ma); Addresses::One(ma) }; @@ -167,7 +168,8 @@ impl Transport for TcpConfig { Either::A(stream::once(Ok(event))) } Addresses::Many(ref aa) => { - let events = aa.values() + let events = aa.iter() + .map(|(_, _, ma)| ma) .cloned() .map(ListenerEvent::NewAddress) .collect::>(); @@ -232,7 +234,7 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { } // Create a [`Multiaddr`] from the given IP address and port number. -fn sockaddr_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { +fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { let proto = match ip { IpAddr::V4(ip) => Protocol::Ip4(ip), IpAddr::V6(ip) => Protocol::Ip6(ip) @@ -242,11 +244,26 @@ fn sockaddr_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { } // Collect all local host addresses and use the provided port number as listen port. -fn host_addresses(port: u16) -> io::Result> { - let mut addrs = HashMap::new(); +fn host_addresses(port: u16) -> io::Result> { + let mut addrs = Vec::new(); for iface in get_if_addrs()? { - let ma = sockaddr_to_multiaddr(iface.ip(), port); - addrs.insert(iface.ip(), ma); + let ip = iface.ip(); + let ma = ip_to_multiaddr(ip, port); + let ipn = match iface.addr { + IfAddr::V4(ip4) => { + let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); + let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) + .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); + IpNet::V4(ipnet) + } + IfAddr::V6(ip6) => { + let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); + let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) + .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); + IpNet::V6(ipnet) + } + }; + addrs.push((ip, ipn, ma)) } Ok(addrs) } @@ -331,9 +348,11 @@ enum Addresses { /// A specific address is used to listen. One(Multiaddr), /// A set of addresses is used to listen. - Many(HashMap) + Many(Vec<(IpAddr, IpNet, Multiaddr)>) } +type Buffer = VecDeque>>; + /// Stream that listens on an TCP/IP address. pub struct TcpListenStream { /// Stream of incoming sockets. @@ -343,11 +362,67 @@ pub struct TcpListenStream { /// The set of known addresses. addrs: Addresses, /// Temporary buffer of listener events. - pending: VecDeque>>, + pending: Buffer, /// Original configuration. config: TcpConfig } +// Map a `SocketAddr` to the corresponding `Multiaddr`. +// If not found, check for host address changes. +// This is a function rather than a method due to borrowing issues. +fn map_addr(addr: &SocketAddr, addrs: &mut Addresses, pending: &mut Buffer, port: u16) + -> Result +{ + match addrs { + Addresses::One(ref ma) => Ok(ma.clone()), + Addresses::Many(ref mut addrs) => { + // Check for exact match: + if let Some((_, _, ma)) = addrs.iter().find(|(i, ..)| i == &addr.ip()) { + return Ok(ma.clone()) + } + + // No exact match => check netmask + if let Some((_, _, ma)) = addrs.iter().find(|(_, i, _)| i.contains(&addr.ip())) { + return Ok(ma.clone()) + } + + // The local IP address of this socket is new to us. + // We need to check for changes in the set of host addresses and report new + // and expired addresses. + // + // TODO: We do not detect expired addresses unless there is a new address. + + let new_addrs = host_addresses(port)?; + let old_addrs = std::mem::replace(addrs, new_addrs); + + // Check for addresses no longer in use. + for (ip, _, ma) in old_addrs.iter() { + if addrs.iter().find(|(i, ..)| i == ip).is_none() { + debug!("Expired listen address: {}", ma); + pending.push_back(ListenerEvent::AddressExpired(ma.clone())); + } + } + + // Check for new addresses. + for (ip, _, ma) in addrs.iter() { + if old_addrs.iter().find(|(i, ..)| i == ip).is_none() { + debug!("New listen address: {}", ma); + pending.push_back(ListenerEvent::NewAddress(ma.clone())); + } + } + + // We should now be able to find the listen address of the local socket address, + // if not something is seriously wrong and we report an error. + if addrs.iter().find(|(i, j, _)| i == &addr.ip() || j.contains(&addr.ip())).is_none() { + let msg = format!("{} does not match any listen address", addr.ip()); + return Err(io::Error::new(io::ErrorKind::Other, msg)) + } + + Ok(ip_to_multiaddr(addr.ip(), port)) + } + } +} + impl Stream for TcpListenStream { type Item = ListenerEvent>; type Error = io::Error; @@ -379,46 +454,14 @@ impl Stream for TcpListenStream { }; let listen_addr = match sock.local_addr() { - Ok(addr) => match self.addrs { - Addresses::One(ref ma) => ma.clone(), - Addresses::Many(ref mut addrs) => if let Some(ma) = addrs.get(&addr.ip()) { - ma.clone() - } else { - // The local IP address of this socket is new to us. - // We need to check for changes in the set of host addresses and report - // new and expired addresses. - // - // TODO: We do not detect expired addresses unless there is a new address. - - let new_addrs = host_addresses(self.port)?; - let old_addrs = std::mem::replace(addrs, new_addrs); - - // Check for addresses no longer in use. - for (ip, ma) in &old_addrs { - if !addrs.contains_key(&ip) { - debug!("Expired listen address: {}", ma); - self.pending.push_back(ListenerEvent::AddressExpired(ma.clone())); - } - } - - // Check for new addresses. - for (ip, ma) in addrs { - if !old_addrs.contains_key(&ip) { - debug!("New listen address: {}", ma); - self.pending.push_back(ListenerEvent::NewAddress(ma.clone())); - } - } - - sockaddr_to_multiaddr(addr.ip(), self.port) - } - } + Ok(addr) => map_addr(&addr, &mut self.addrs, &mut self.pending, self.port)?, Err(err) => { error!("Failed to get local address of incoming socket: {:?}", err); return Err(err) } }; - let remote_addr = sockaddr_to_multiaddr(sock_addr.ip(), sock_addr.port()); + let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); match apply_config(&self.config, &sock) { Ok(()) => { @@ -459,7 +502,6 @@ pub struct TcpTransStream { } impl Read for TcpTransStream { - #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { self.inner.read(buf) } @@ -476,26 +518,22 @@ impl AsyncRead for TcpTransStream { } impl Write for TcpTransStream { - #[inline] fn write(&mut self, buf: &[u8]) -> Result { self.inner.write(buf) } - #[inline] fn flush(&mut self) -> Result<(), io::Error> { self.inner.flush() } } impl AsyncWrite for TcpTransStream { - #[inline] fn shutdown(&mut self) -> Poll<(), io::Error> { AsyncWrite::shutdown(&mut self.inner) } } impl Drop for TcpTransStream { - #[inline] fn drop(&mut self) { if let Ok(addr) = self.inner.peer_addr() { debug!("Dropped TCP connection to {:?}", addr); @@ -507,17 +545,11 @@ impl Drop for TcpTransStream { #[cfg(test)] mod tests { - use tokio::runtime::current_thread::Runtime; - use super::{multiaddr_to_socketaddr, TcpConfig}; - use futures::stream::Stream; - use futures::Future; - use std; + use futures::prelude::*; + use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use libp2p_core::{ - Transport, - multiaddr::{Multiaddr, Protocol}, - transport::ListenerEvent - }; + use super::{multiaddr_to_socketaddr, TcpConfig}; + use tokio::runtime::current_thread::Runtime; use tokio_io; #[test]