mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-21 05:41:33 +00:00
Don't poll network unnecessarily. (#1977)
* Don't poll network unnecessarily. * Fix ci. * Damn tokio. * Address review comments. * Update deps. * Don't drop packet if socket is not writable. * Increase TTL and rename to `query_interval`. * Update CHANGELOG. Co-authored-by: Roman S. Borschel <roman@parity.io>
This commit is contained in:
@ -18,33 +18,80 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::service::{MdnsPacket, MdnsService, build_query_response, build_service_discovery_response};
|
||||
use async_io::Timer;
|
||||
use crate::dns::{build_query, build_query_response, build_service_discovery_response};
|
||||
use crate::query::MdnsPacket;
|
||||
use async_io::{Async, Timer};
|
||||
use futures::prelude::*;
|
||||
use if_watch::{IfEvent, IfWatcher};
|
||||
use lazy_static::lazy_static;
|
||||
use libp2p_core::{
|
||||
Multiaddr,
|
||||
PeerId,
|
||||
address_translation,
|
||||
connection::ConnectionId,
|
||||
multiaddr::Protocol
|
||||
address_translation, connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId,
|
||||
};
|
||||
use libp2p_swarm::{
|
||||
NetworkBehaviour,
|
||||
NetworkBehaviourAction,
|
||||
PollParameters,
|
||||
ProtocolsHandler,
|
||||
protocols_handler::DummyProtocolsHandler
|
||||
protocols_handler::DummyProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
|
||||
PollParameters, ProtocolsHandler,
|
||||
};
|
||||
use smallvec::SmallVec;
|
||||
use std::{cmp, fmt, io, iter, mem, pin::Pin, time::{Duration, Instant}, task::Context, task::Poll};
|
||||
use socket2::{Domain, Socket, Type};
|
||||
use std::{
|
||||
cmp,
|
||||
collections::VecDeque,
|
||||
fmt, io, iter,
|
||||
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
|
||||
pin::Pin,
|
||||
task::Context,
|
||||
task::Poll,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60);
|
||||
lazy_static! {
|
||||
static ref IPV4_MDNS_MULTICAST_ADDRESS: SocketAddr =
|
||||
SocketAddr::from((Ipv4Addr::new(224, 0, 0, 251), 5353));
|
||||
}
|
||||
|
||||
pub struct MdnsConfig {
|
||||
/// TTL to use for mdns records.
|
||||
pub ttl: Duration,
|
||||
/// Interval at which to poll the network for new peers. This isn't
|
||||
/// necessary during normal operation but avoids the case that an
|
||||
/// initial packet was lost and not discovering any peers until a new
|
||||
/// peer joins the network. Receiving an mdns packet resets the timer
|
||||
/// preventing unnecessary traffic.
|
||||
pub query_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for MdnsConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
ttl: Duration::from_secs(6 * 60),
|
||||
query_interval: Duration::from_secs(5 * 60),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
|
||||
/// them to the topology.
|
||||
#[derive(Debug)]
|
||||
pub struct Mdns {
|
||||
/// The inner service.
|
||||
service: MdnsBusyWrapper,
|
||||
/// Main socket for listening.
|
||||
recv_socket: Async<UdpSocket>,
|
||||
|
||||
/// Query socket for making queries.
|
||||
send_socket: Async<UdpSocket>,
|
||||
|
||||
/// Iface watcher.
|
||||
if_watch: IfWatcher,
|
||||
|
||||
/// Buffer used for receiving data from the main socket.
|
||||
/// RFC6762 discourages packets larger than the interface MTU, but allows sizes of up to 9000
|
||||
/// bytes, if it can be ensured that all participating devices can handle such large packets.
|
||||
/// For computers with several interfaces and IP addresses responses can easily reach sizes in
|
||||
/// the range of 3000 bytes, so 4096 seems sensible for now. For more information see
|
||||
/// [rfc6762](https://tools.ietf.org/html/rfc6762#page-46).
|
||||
recv_buffer: [u8; 4096],
|
||||
|
||||
/// Buffers pending to send on the main socket.
|
||||
send_buffer: VecDeque<Vec<u8>>,
|
||||
|
||||
/// List of nodes that we have discovered, the address, and when their TTL expires.
|
||||
///
|
||||
@ -56,45 +103,55 @@ pub struct Mdns {
|
||||
///
|
||||
/// `None` if `discovered_nodes` is empty.
|
||||
closest_expiration: Option<Timer>,
|
||||
}
|
||||
|
||||
/// `MdnsService::next` takes ownership of `self`, returning a future that resolves with both itself
|
||||
/// and a `MdnsPacket` (similar to the old Tokio socket send style). The two states are thus `Free`
|
||||
/// with an `MdnsService` or `Busy` with a future returning the original `MdnsService` and an
|
||||
/// `MdnsPacket`.
|
||||
enum MdnsBusyWrapper {
|
||||
Free(MdnsService),
|
||||
Busy(Pin<Box<dyn Future<Output = (MdnsService, MdnsPacket)> + Send>>),
|
||||
Poisoned,
|
||||
}
|
||||
/// Queued events.
|
||||
events: VecDeque<MdnsEvent>,
|
||||
|
||||
impl fmt::Debug for MdnsBusyWrapper {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Free(service) => {
|
||||
fmt.debug_struct("MdnsBusyWrapper::Free")
|
||||
.field("service", service)
|
||||
.finish()
|
||||
},
|
||||
Self::Busy(_) => {
|
||||
fmt.debug_struct("MdnsBusyWrapper::Busy")
|
||||
.finish()
|
||||
}
|
||||
Self::Poisoned => {
|
||||
fmt.debug_struct("MdnsBusyWrapper::Poisoned")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Discovery interval.
|
||||
query_interval: Duration,
|
||||
|
||||
/// Record ttl.
|
||||
ttl: Duration,
|
||||
|
||||
/// Discovery timer.
|
||||
timeout: Timer,
|
||||
}
|
||||
|
||||
impl Mdns {
|
||||
/// Builds a new `Mdns` behaviour.
|
||||
pub async fn new() -> io::Result<Self> {
|
||||
pub async fn new(config: MdnsConfig) -> io::Result<Self> {
|
||||
let recv_socket = {
|
||||
let socket = Socket::new(
|
||||
Domain::ipv4(),
|
||||
Type::dgram(),
|
||||
Some(socket2::Protocol::udp()),
|
||||
)?;
|
||||
socket.set_reuse_address(true)?;
|
||||
#[cfg(unix)]
|
||||
socket.set_reuse_port(true)?;
|
||||
socket.bind(&SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 5353).into())?;
|
||||
let socket = socket.into_udp_socket();
|
||||
socket.set_multicast_loop_v4(true)?;
|
||||
socket.set_multicast_ttl_v4(255)?;
|
||||
Async::new(socket)?
|
||||
};
|
||||
let send_socket = {
|
||||
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0))?;
|
||||
Async::new(socket)?
|
||||
};
|
||||
let if_watch = if_watch::IfWatcher::new().await?;
|
||||
Ok(Self {
|
||||
service: MdnsBusyWrapper::Free(MdnsService::new().await?),
|
||||
recv_socket,
|
||||
send_socket,
|
||||
if_watch,
|
||||
recv_buffer: [0; 4096],
|
||||
send_buffer: Default::default(),
|
||||
discovered_nodes: SmallVec::new(),
|
||||
closest_expiration: None,
|
||||
events: Default::default(),
|
||||
query_interval: config.query_interval,
|
||||
ttl: config.ttl,
|
||||
timeout: Timer::interval(config.query_interval),
|
||||
})
|
||||
}
|
||||
|
||||
@ -107,6 +164,77 @@ impl Mdns {
|
||||
pub fn discovered_nodes(&self) -> impl ExactSizeIterator<Item = &PeerId> {
|
||||
self.discovered_nodes.iter().map(|(p, _, _)| p)
|
||||
}
|
||||
|
||||
fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
|
||||
self.timeout.set_interval(self.query_interval);
|
||||
match packet {
|
||||
MdnsPacket::Query(query) => {
|
||||
for packet in build_query_response(
|
||||
query.query_id(),
|
||||
*params.local_peer_id(),
|
||||
params.listened_addresses(),
|
||||
self.ttl,
|
||||
) {
|
||||
self.send_buffer.push_back(packet);
|
||||
}
|
||||
}
|
||||
MdnsPacket::Response(response) => {
|
||||
// We replace the IP address with the address we observe the
|
||||
// remote as and the address they listen on.
|
||||
let obs_ip = Protocol::from(response.remote_addr().ip());
|
||||
let obs_port = Protocol::Udp(response.remote_addr().port());
|
||||
let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect();
|
||||
|
||||
let mut discovered: SmallVec<[_; 4]> = SmallVec::new();
|
||||
for peer in response.discovered_peers() {
|
||||
if peer.id() == params.local_peer_id() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let new_expiration = Instant::now() + peer.ttl();
|
||||
|
||||
let mut addrs: Vec<Multiaddr> = Vec::new();
|
||||
for addr in peer.addresses() {
|
||||
if let Some(new_addr) = address_translation(&addr, &observed) {
|
||||
addrs.push(new_addr.clone())
|
||||
}
|
||||
addrs.push(addr.clone())
|
||||
}
|
||||
|
||||
for addr in addrs {
|
||||
if let Some((_, _, cur_expires)) = self
|
||||
.discovered_nodes
|
||||
.iter_mut()
|
||||
.find(|(p, a, _)| p == peer.id() && *a == addr)
|
||||
{
|
||||
*cur_expires = cmp::max(*cur_expires, new_expiration);
|
||||
} else {
|
||||
self.discovered_nodes
|
||||
.push((*peer.id(), addr.clone(), new_expiration));
|
||||
}
|
||||
discovered.push((*peer.id(), addr));
|
||||
}
|
||||
}
|
||||
|
||||
self.closest_expiration = self
|
||||
.discovered_nodes
|
||||
.iter()
|
||||
.fold(None, |exp, &(_, _, elem_exp)| {
|
||||
Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp))
|
||||
})
|
||||
.map(Timer::at);
|
||||
|
||||
self.events
|
||||
.push_back(MdnsEvent::Discovered(DiscoveredAddrsIter {
|
||||
inner: discovered.into_iter(),
|
||||
}));
|
||||
}
|
||||
MdnsPacket::ServiceDiscovery(disc) => {
|
||||
let resp = build_service_discovery_response(disc.query_id(), self.ttl);
|
||||
self.send_buffer.push_back(resp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkBehaviour for Mdns {
|
||||
@ -149,138 +277,102 @@ impl NetworkBehaviour for Mdns {
|
||||
Self::OutEvent,
|
||||
>,
|
||||
> {
|
||||
// Remove expired peers.
|
||||
if let Some(ref mut closest_expiration) = self.closest_expiration {
|
||||
match Pin::new(closest_expiration).poll(cx) {
|
||||
Poll::Ready(now) => {
|
||||
let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
|
||||
while let Some(pos) = self.discovered_nodes.iter().position(|(_, _, exp)| *exp < now) {
|
||||
let (peer_id, addr, _) = self.discovered_nodes.remove(pos);
|
||||
expired.push((peer_id, addr));
|
||||
while let Poll::Ready(event) = Pin::new(&mut self.if_watch).poll(cx) {
|
||||
let multicast = From::from([224, 0, 0, 251]);
|
||||
let socket = self.recv_socket.get_ref();
|
||||
match event {
|
||||
Ok(IfEvent::Up(inet)) => {
|
||||
if inet.addr().is_loopback() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if !expired.is_empty() {
|
||||
let event = MdnsEvent::Expired(ExpiredAddrsIter {
|
||||
inner: expired.into_iter(),
|
||||
});
|
||||
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
if let IpAddr::V4(addr) = inet.addr() {
|
||||
log::trace!("joining multicast on iface {}", addr);
|
||||
if let Err(err) = socket.join_multicast_v4(&multicast, &addr) {
|
||||
log::error!("join multicast failed: {}", err);
|
||||
} else {
|
||||
self.send_buffer.push_back(build_query());
|
||||
}
|
||||
}
|
||||
},
|
||||
Poll::Pending => (),
|
||||
}
|
||||
Ok(IfEvent::Down(inet)) => {
|
||||
if inet.addr().is_loopback() {
|
||||
continue;
|
||||
}
|
||||
if let IpAddr::V4(addr) = inet.addr() {
|
||||
log::trace!("leaving multicast on iface {}", addr);
|
||||
if let Err(err) = socket.leave_multicast_v4(&multicast, &addr) {
|
||||
log::error!("leave multicast failed: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => log::error!("if watch returned an error: {}", err),
|
||||
}
|
||||
}
|
||||
|
||||
// Polling the mDNS service, and obtain the list of nodes discovered this round.
|
||||
let discovered = loop {
|
||||
let service = mem::replace(&mut self.service, MdnsBusyWrapper::Poisoned);
|
||||
|
||||
let packet = match service {
|
||||
MdnsBusyWrapper::Free(service) => {
|
||||
self.service = MdnsBusyWrapper::Busy(Box::pin(service.next()));
|
||||
continue;
|
||||
},
|
||||
MdnsBusyWrapper::Busy(mut fut) => {
|
||||
match fut.as_mut().poll(cx) {
|
||||
Poll::Ready((service, packet)) => {
|
||||
self.service = MdnsBusyWrapper::Free(service);
|
||||
packet
|
||||
},
|
||||
Poll::Pending => {
|
||||
self.service = MdnsBusyWrapper::Busy(fut);
|
||||
return Poll::Pending;
|
||||
}
|
||||
// Poll receive socket.
|
||||
while self.recv_socket.poll_readable(cx).is_ready() {
|
||||
match self
|
||||
.recv_socket
|
||||
.recv_from(&mut self.recv_buffer)
|
||||
.now_or_never()
|
||||
{
|
||||
Some(Ok((len, from))) => {
|
||||
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
|
||||
{
|
||||
self.inject_mdns_packet(packet, params);
|
||||
}
|
||||
},
|
||||
MdnsBusyWrapper::Poisoned => panic!("Mdns poisoned"),
|
||||
};
|
||||
|
||||
match packet {
|
||||
MdnsPacket::Query(query) => {
|
||||
// MaybeBusyMdnsService should always be Free.
|
||||
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
|
||||
for packet in build_query_response(
|
||||
query.query_id(),
|
||||
*params.local_peer_id(),
|
||||
params.listened_addresses(),
|
||||
MDNS_RESPONSE_TTL,
|
||||
) {
|
||||
service.enqueue_response(packet)
|
||||
}
|
||||
} else { debug_assert!(false); }
|
||||
},
|
||||
MdnsPacket::Response(response) => {
|
||||
// We replace the IP address with the address we observe the
|
||||
// remote as and the address they listen on.
|
||||
let obs_ip = Protocol::from(response.remote_addr().ip());
|
||||
let obs_port = Protocol::Udp(response.remote_addr().port());
|
||||
let observed: Multiaddr = iter::once(obs_ip)
|
||||
.chain(iter::once(obs_port))
|
||||
.collect();
|
||||
|
||||
let mut discovered: SmallVec<[_; 4]> = SmallVec::new();
|
||||
for peer in response.discovered_peers() {
|
||||
if peer.id() == params.local_peer_id() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let new_expiration = Instant::now() + peer.ttl();
|
||||
|
||||
let mut addrs: Vec<Multiaddr> = Vec::new();
|
||||
for addr in peer.addresses() {
|
||||
if let Some(new_addr) = address_translation(&addr, &observed) {
|
||||
addrs.push(new_addr.clone())
|
||||
}
|
||||
addrs.push(addr.clone())
|
||||
}
|
||||
|
||||
for addr in addrs {
|
||||
if let Some((_, _, cur_expires)) = self.discovered_nodes.iter_mut()
|
||||
.find(|(p, a, _)| p == peer.id() && *a == addr)
|
||||
{
|
||||
*cur_expires = cmp::max(*cur_expires, new_expiration);
|
||||
} else {
|
||||
self.discovered_nodes.push((*peer.id(), addr.clone(), new_expiration));
|
||||
}
|
||||
|
||||
discovered.push((*peer.id(), addr));
|
||||
}
|
||||
}
|
||||
|
||||
break discovered;
|
||||
},
|
||||
MdnsPacket::ServiceDiscovery(disc) => {
|
||||
// MaybeBusyMdnsService should always be Free.
|
||||
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
|
||||
let resp = build_service_discovery_response(
|
||||
disc.query_id(),
|
||||
MDNS_RESPONSE_TTL,
|
||||
);
|
||||
service.enqueue_response(resp);
|
||||
} else { debug_assert!(false); }
|
||||
},
|
||||
}
|
||||
Some(Err(err)) => log::error!("Failed reading datagram: {}", err),
|
||||
_ => {}
|
||||
}
|
||||
};
|
||||
}
|
||||
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
|
||||
self.send_buffer.push_back(build_query());
|
||||
}
|
||||
// Send responses.
|
||||
if !self.send_buffer.is_empty() {
|
||||
while self.send_socket.poll_writable(cx).is_ready() {
|
||||
if let Some(packet) = self.send_buffer.pop_front() {
|
||||
match self
|
||||
.send_socket
|
||||
.send_to(&packet, *IPV4_MDNS_MULTICAST_ADDRESS)
|
||||
.now_or_never()
|
||||
{
|
||||
Some(Ok(_)) => {}
|
||||
Some(Err(err)) => log::error!("{}", err),
|
||||
None => self.send_buffer.push_front(packet),
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Emit discovered event.
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
}
|
||||
// Emit expired event.
|
||||
if let Some(ref mut closest_expiration) = self.closest_expiration {
|
||||
if let Poll::Ready(now) = Pin::new(closest_expiration).poll(cx) {
|
||||
let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
|
||||
while let Some(pos) = self
|
||||
.discovered_nodes
|
||||
.iter()
|
||||
.position(|(_, _, exp)| *exp < now)
|
||||
{
|
||||
let (peer_id, addr, _) = self.discovered_nodes.remove(pos);
|
||||
expired.push((peer_id, addr));
|
||||
}
|
||||
|
||||
// Getting this far implies that we discovered new nodes. As the final step, we need to
|
||||
// refresh `closest_expiration`.
|
||||
self.closest_expiration = self.discovered_nodes.iter()
|
||||
.fold(None, |exp, &(_, _, elem_exp)| {
|
||||
Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp))
|
||||
})
|
||||
.map(Timer::at);
|
||||
if !expired.is_empty() {
|
||||
let event = MdnsEvent::Expired(ExpiredAddrsIter {
|
||||
inner: expired.into_iter(),
|
||||
});
|
||||
|
||||
Poll::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter {
|
||||
inner: discovered.into_iter(),
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Mdns {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_struct("Mdns")
|
||||
.field("service", &self.service)
|
||||
.finish()
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
@ -299,7 +391,7 @@ pub enum MdnsEvent {
|
||||
|
||||
/// Iterator that produces the list of addresses that have been discovered.
|
||||
pub struct DiscoveredAddrsIter {
|
||||
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
|
||||
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>,
|
||||
}
|
||||
|
||||
impl Iterator for DiscoveredAddrsIter {
|
||||
@ -316,19 +408,17 @@ impl Iterator for DiscoveredAddrsIter {
|
||||
}
|
||||
}
|
||||
|
||||
impl ExactSizeIterator for DiscoveredAddrsIter {
|
||||
}
|
||||
impl ExactSizeIterator for DiscoveredAddrsIter {}
|
||||
|
||||
impl fmt::Debug for DiscoveredAddrsIter {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_struct("DiscoveredAddrsIter")
|
||||
.finish()
|
||||
fmt.debug_struct("DiscoveredAddrsIter").finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator that produces the list of addresses that have expired.
|
||||
pub struct ExpiredAddrsIter {
|
||||
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
|
||||
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>,
|
||||
}
|
||||
|
||||
impl Iterator for ExpiredAddrsIter {
|
||||
@ -345,12 +435,10 @@ impl Iterator for ExpiredAddrsIter {
|
||||
}
|
||||
}
|
||||
|
||||
impl ExactSizeIterator for ExpiredAddrsIter {
|
||||
}
|
||||
impl ExactSizeIterator for ExpiredAddrsIter {}
|
||||
|
||||
impl fmt::Debug for ExpiredAddrsIter {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_struct("ExpiredAddrsIter")
|
||||
.finish()
|
||||
fmt.debug_struct("ExpiredAddrsIter").finish()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user