diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 4a9ed144..58fb9980 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,11 +1,14 @@ # 0.38.0 [unreleased] +- Update to `if-watch` `v3.0.0` and pass through `tokio` and `async-io` features. See [PR 3101]. + - Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961]. - Remove `TcpListenStream` and `TcpListenerEvent` from public API. See [PR 2961]. - Update to `libp2p-core` `v0.38.0`. +[PR 3101]: https://github.com/libp2p/rust-libp2p/pull/3101 [PR 2961]: https://github.com/libp2p/rust-libp2p/pull/2961 # 0.37.0 diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 502e05cd..585d5a91 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -11,26 +11,26 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-io-crate = { package = "async-io", version = "1.2.0", optional = true } +async-io = { version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = "2.0.0" +if-watch = "3.0.0" libc = "0.2.80" libp2p-core = { version = "0.38.0", path = "../../core" } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } -tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true } +tokio = { version = "1.19.0", default-features = false, features = ["net"], optional = true } [features] -tokio = ["tokio-crate"] -async-io = ["async-io-crate"] +tokio = ["dep:tokio", "if-watch/tokio"] +async-io = ["dep:async-io", "if-watch/smol"] [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["full"] } +tokio = { version = "1.0.1", default-features = false, features = ["full"] } env_logger = "0.9.0" -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 6e760ae5..c36152b2 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -41,7 +41,7 @@ use futures::{ prelude::*, }; use futures_timer::Delay; -use if_watch::{IfEvent, IfWatcher}; +use if_watch::IfEvent; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -385,7 +385,7 @@ where return TcpListenStream::::new( id, listener, - Some(IfWatcher::new()?), + Some(T::new_if_watcher()?), self.port_reuse.clone(), ); } @@ -656,7 +656,7 @@ where /// become or stop being available. /// /// `None` if the socket is only listening on a single interface. - if_watcher: Option, + if_watcher: Option, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -680,7 +680,7 @@ where fn new( listener_id: ListenerId, listener: TcpListener, - if_watcher: Option, + if_watcher: Option, port_reuse: PortReuse, ) -> io::Result { let listen_addr = listener.local_addr()?; @@ -706,7 +706,7 @@ where fn disable_port_reuse(&mut self) { match &self.if_watcher { Some(if_watcher) => { - for ip_net in if_watcher.iter() { + for ip_net in T::addrs(if_watcher) { self.port_reuse .unregister(ip_net.addr(), self.listen_addr.port()); } @@ -749,7 +749,7 @@ where } if let Some(if_watcher) = me.if_watcher.as_mut() { - while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { + while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) { match event { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); @@ -986,11 +986,11 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1055,11 +1055,11 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1162,11 +1162,11 @@ mod tests { let (port_reuse_tx, port_reuse_rx) = oneshot::channel(); let listener = listener::(addr.clone(), ready_tx, port_reuse_rx); let dialer = dialer::(addr, ready_rx, port_reuse_tx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1220,7 +1220,7 @@ mod tests { #[cfg(feature = "tokio")] { let listener = listen_twice::(addr); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); @@ -1253,7 +1253,7 @@ mod tests { #[cfg(feature = "tokio")] { - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index a341026e..d94da7a6 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -28,6 +28,8 @@ pub mod tokio; use futures::future::BoxFuture; use futures::io::{AsyncRead, AsyncWrite}; +use futures::Stream; +use if_watch::{IfEvent, IpNet}; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::task::{Context, Poll}; use std::{fmt, io}; @@ -46,6 +48,14 @@ pub trait Provider: Clone + Send + 'static { type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; /// The type of TCP listeners obtained from [`Provider::new_listener`]. type Listener: Send + Unpin; + /// The type of IfWatcher obtained from [`Provider::new_if_watcher`]. + type IfWatcher: Stream> + Send + Unpin; + + /// Create a new IfWatcher responsible for detecting IP address changes. + fn new_if_watcher() -> io::Result; + + /// An iterator over all currently discovered addresses. + fn addrs(_: &Self::IfWatcher) -> Vec; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index 0fc1102f..590f109d 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -20,7 +20,7 @@ use super::{Incoming, Provider}; -use async_io_crate::Async; +use async_io::Async; use futures::future::{BoxFuture, FutureExt}; use std::io; use std::net; @@ -55,6 +55,15 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = Async; type Listener = Async; + type IfWatcher = if_watch::smol::IfWatcher; + + fn new_if_watcher() -> io::Result { + Self::IfWatcher::new() + } + + fn addrs(if_watcher: &Self::IfWatcher) -> Vec { + if_watcher.iter().copied().collect() + } fn new_listener(l: net::TcpListener) -> io::Result { Async::new(l) diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 48647833..e4b75c8d 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -39,7 +39,6 @@ use std::task::{Context, Poll}; /// # use libp2p_core::Transport; /// # use futures::future; /// # use std::pin::Pin; -/// # use tokio_crate as tokio; /// # /// # #[tokio::main] /// # async fn main() { @@ -59,17 +58,26 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = TcpStream; - type Listener = tokio_crate::net::TcpListener; + type Listener = tokio::net::TcpListener; + type IfWatcher = if_watch::tokio::IfWatcher; + + fn new_if_watcher() -> io::Result { + Self::IfWatcher::new() + } + + fn addrs(if_watcher: &Self::IfWatcher) -> Vec { + if_watcher.iter().copied().collect() + } fn new_listener(l: net::TcpListener) -> io::Result { - tokio_crate::net::TcpListener::try_from(l) + tokio::net::TcpListener::try_from(l) } fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result> { async move { - // Taken from [`tokio_crate::net::TcpStream::connect_mio`]. + // Taken from [`tokio::net::TcpStream::connect_mio`]. - let stream = tokio_crate::net::TcpStream::try_from(s)?; + let stream = tokio::net::TcpStream::try_from(s)?; // Once we've connected, wait for the stream to be writable as // that's when the actual connection has been initiated. Once we're @@ -109,12 +117,12 @@ impl Provider for Tcp { } } -/// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. +/// A [`tokio::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. #[derive(Debug)] -pub struct TcpStream(pub tokio_crate::net::TcpStream); +pub struct TcpStream(pub tokio::net::TcpStream); -impl From for tokio_crate::net::TcpStream { - fn from(t: TcpStream) -> tokio_crate::net::TcpStream { +impl From for tokio::net::TcpStream { + fn from(t: TcpStream) -> tokio::net::TcpStream { t.0 } } @@ -125,8 +133,8 @@ impl AsyncRead for TcpStream { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let mut read_buf = tokio_crate::io::ReadBuf::new(buf); - futures::ready!(tokio_crate::io::AsyncRead::poll_read( + let mut read_buf = tokio::io::ReadBuf::new(buf); + futures::ready!(tokio::io::AsyncRead::poll_read( Pin::new(&mut self.0), cx, &mut read_buf @@ -141,15 +149,15 @@ impl AsyncWrite for TcpStream { cx: &mut Context, buf: &[u8], ) -> Poll> { - tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) + tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio_crate::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) + tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio_crate::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) + tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) } fn poll_write_vectored( @@ -157,6 +165,6 @@ impl AsyncWrite for TcpStream { cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { - tokio_crate::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) + tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) } }