mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-18 04:21:22 +00:00
transports/tcp: Unify symbol naming (#2961)
Co-authored-by: Elena Frank <elena.frank@protonmail.com>
This commit is contained in:
@ -48,8 +48,12 @@
|
||||
|
||||
- Remove deprecated features: `tcp-tokio`, `mdns-tokio`, `dns-tokio`, `tcp-async-io`, `mdns-async-io`, `dns-async-std`.
|
||||
See [PR 3001].
|
||||
- Introduce [`libp2p-tls` `v0.1.0`](transports/tls/CHANGELOG.md#010). See [PR 2945].
|
||||
- Update individual crates.
|
||||
- Update to [`libp2p-tcp` `v0.38.0`](transports/tcp/CHANGELOG.md#0380).
|
||||
|
||||
[PR 3001]: https://github.com/libp2p/rust-libp2p/pull/3001
|
||||
[PR 2945]: https://github.com/libp2p/rust-libp2p/pull/2945
|
||||
|
||||
# 0.49.0
|
||||
|
||||
@ -66,8 +70,6 @@
|
||||
|
||||
See [PR 2962].
|
||||
|
||||
- Introduce [`libp2p-tls` `v0.1.0`](transports/tls/CHANGELOG.md#010). See [PR 2945].
|
||||
|
||||
- Update individual crates.
|
||||
- Update to [`libp2p-autonat` `v0.8.0`](protocols/autonat/CHANGELOG.md#0080).
|
||||
- Update to [`libp2p-core` `v0.37.0`](core/CHANGELOG.md#0370).
|
||||
@ -97,7 +99,6 @@
|
||||
|
||||
[PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918
|
||||
[PR 2962]: https://github.com/libp2p/rust-libp2p/pull/2962
|
||||
[PR 2945]: https://github.com/libp2p/rust-libp2p/pull/2945
|
||||
|
||||
# 0.48.0
|
||||
|
||||
|
@ -115,7 +115,7 @@ smallvec = "1.6.1"
|
||||
libp2p-deflate = { version = "0.37.0", path = "transports/deflate", optional = true }
|
||||
libp2p-dns = { version = "0.37.0", path = "transports/dns", optional = true }
|
||||
libp2p-mdns = { version = "0.41.0", path = "protocols/mdns", optional = true }
|
||||
libp2p-tcp = { version = "0.37.0", path = "transports/tcp", optional = true }
|
||||
libp2p-tcp = { version = "0.38.0", path = "transports/tcp", optional = true }
|
||||
libp2p-websocket = { version = "0.39.0", path = "transports/websocket", optional = true }
|
||||
libp2p-tls = { version = "0.1.0-alpha", path = "transports/tls", optional = true }
|
||||
|
||||
|
@ -29,7 +29,6 @@
|
||||
//! ```
|
||||
|
||||
use futures::StreamExt;
|
||||
use libp2p::tcp::GenTcpConfig;
|
||||
use libp2p::{
|
||||
core::upgrade,
|
||||
floodsub::{self, Floodsub, FloodsubEvent},
|
||||
@ -39,15 +38,9 @@ use libp2p::{
|
||||
// `TokioMdns` is available through the `mdns-tokio` feature.
|
||||
TokioMdns,
|
||||
},
|
||||
mplex,
|
||||
noise,
|
||||
mplex, noise,
|
||||
swarm::{SwarmBuilder, SwarmEvent},
|
||||
// `TokioTcpTransport` is available through the `tcp-tokio` feature.
|
||||
tcp::TokioTcpTransport,
|
||||
Multiaddr,
|
||||
NetworkBehaviour,
|
||||
PeerId,
|
||||
Transport,
|
||||
tcp, Multiaddr, NetworkBehaviour, PeerId, Transport,
|
||||
};
|
||||
use std::error::Error;
|
||||
use tokio::io::{self, AsyncBufReadExt};
|
||||
@ -64,7 +57,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
// Create a tokio-based TCP transport use noise for authenticated
|
||||
// encryption and Mplex for multiplexing of substreams on a TCP stream.
|
||||
let transport = TokioTcpTransport::new(GenTcpConfig::default().nodelay(true))
|
||||
let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(
|
||||
noise::NoiseAuthenticated::xx(&id_keys)
|
||||
|
@ -33,7 +33,6 @@
|
||||
//! to work, the ipfs node needs to be configured to use gossipsub.
|
||||
use async_std::io;
|
||||
use futures::{prelude::*, select};
|
||||
use libp2p::tcp::GenTcpConfig;
|
||||
use libp2p::{
|
||||
core::{
|
||||
either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version,
|
||||
@ -44,7 +43,7 @@ use libp2p::{
|
||||
noise, ping,
|
||||
pnet::{PnetConfig, PreSharedKey},
|
||||
swarm::SwarmEvent,
|
||||
tcp::TcpTransport,
|
||||
tcp,
|
||||
yamux::YamuxConfig,
|
||||
Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport,
|
||||
};
|
||||
@ -58,7 +57,7 @@ pub fn build_transport(
|
||||
let noise_config = noise::NoiseAuthenticated::xx(&key_pair).unwrap();
|
||||
let yamux_config = YamuxConfig::default();
|
||||
|
||||
let base_transport = TcpTransport::new(GenTcpConfig::default().nodelay(true));
|
||||
let base_transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true));
|
||||
let maybe_encrypted = match psk {
|
||||
Some(psk) => EitherTransport::Left(
|
||||
base_transport.and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)),
|
||||
|
@ -30,9 +30,8 @@ use libp2p::core::muxing::StreamMuxerExt;
|
||||
use libp2p::core::{
|
||||
identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, Transport,
|
||||
};
|
||||
use libp2p::mplex;
|
||||
use libp2p::plaintext::PlainText2Config;
|
||||
use libp2p::tcp::GenTcpConfig;
|
||||
use libp2p::{mplex, tcp};
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -170,7 +169,7 @@ fn tcp_transport(split_send_size: usize) -> BenchTransport {
|
||||
let mut mplex = mplex::MplexConfig::default();
|
||||
mplex.set_split_send_size(split_send_size);
|
||||
|
||||
libp2p::tcp::TcpTransport::new(GenTcpConfig::default().nodelay(true))
|
||||
tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(PlainText2Config { local_public_key })
|
||||
.multiplex(mplex)
|
||||
|
@ -30,7 +30,7 @@ use libp2p::identify;
|
||||
use libp2p::noise;
|
||||
use libp2p::relay::v2::client::{self, Client};
|
||||
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
|
||||
use libp2p::tcp::{GenTcpConfig, TcpTransport};
|
||||
use libp2p::tcp;
|
||||
use libp2p::Transport;
|
||||
use libp2p::{dcutr, ping};
|
||||
use libp2p::{identity, NetworkBehaviour, PeerId};
|
||||
@ -90,8 +90,8 @@ fn main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
let transport = OrTransport::new(
|
||||
relay_transport,
|
||||
block_on(DnsConfig::system(TcpTransport::new(
|
||||
GenTcpConfig::default().port_reuse(true),
|
||||
block_on(DnsConfig::system(tcp::async_io::Transport::new(
|
||||
tcp::Config::default().port_reuse(true),
|
||||
)))
|
||||
.unwrap(),
|
||||
)
|
||||
|
@ -555,7 +555,7 @@ mod tests {
|
||||
use futures::pin_mut;
|
||||
use libp2p::mplex::MplexConfig;
|
||||
use libp2p::noise;
|
||||
use libp2p::tcp::{GenTcpConfig, TcpTransport};
|
||||
use libp2p::tcp;
|
||||
use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport};
|
||||
use libp2p_swarm::{Swarm, SwarmEvent};
|
||||
use std::time::Duration;
|
||||
@ -569,7 +569,7 @@ mod tests {
|
||||
.into_authentic(&id_keys)
|
||||
.unwrap();
|
||||
let pubkey = id_keys.public();
|
||||
let transport = TcpTransport::new(GenTcpConfig::default().nodelay(true))
|
||||
let transport = tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||
.multiplex(MplexConfig::new())
|
||||
|
@ -291,7 +291,7 @@ pub enum UpgradeError {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::channel::oneshot;
|
||||
use libp2p::tcp::TcpTransport;
|
||||
use libp2p::tcp;
|
||||
use libp2p_core::{
|
||||
identity,
|
||||
upgrade::{self, apply_inbound, apply_outbound},
|
||||
@ -308,7 +308,7 @@ mod tests {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let bg_task = async_std::task::spawn(async move {
|
||||
let mut transport = TcpTransport::default().boxed();
|
||||
let mut transport = tcp::async_io::Transport::default().boxed();
|
||||
|
||||
transport
|
||||
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
|
||||
@ -351,7 +351,7 @@ mod tests {
|
||||
});
|
||||
|
||||
async_std::task::block_on(async move {
|
||||
let mut transport = TcpTransport::default();
|
||||
let mut transport = tcp::async_io::Transport::default();
|
||||
|
||||
let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
|
||||
let info = apply_outbound(socket, Protocol, upgrade::Version::V1)
|
||||
|
@ -31,7 +31,7 @@ use libp2p::mplex;
|
||||
use libp2p::noise;
|
||||
use libp2p::ping;
|
||||
use libp2p::swarm::{Swarm, SwarmEvent};
|
||||
use libp2p::tcp::{GenTcpConfig, TcpTransport};
|
||||
use libp2p::tcp;
|
||||
use libp2p::yamux;
|
||||
use libp2p::NetworkBehaviour;
|
||||
use libp2p_swarm::keep_alive;
|
||||
@ -246,7 +246,7 @@ fn mk_transport(muxer: MuxerChoice) -> (PeerId, transport::Boxed<(PeerId, Stream
|
||||
let peer_id = id_keys.public().to_peer_id();
|
||||
(
|
||||
peer_id,
|
||||
TcpTransport::new(GenTcpConfig::default().nodelay(true))
|
||||
tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(noise::NoiseAuthenticated::xx(&id_keys).unwrap())
|
||||
.multiplex(match muxer {
|
||||
|
@ -25,12 +25,13 @@ use futures::stream::StreamExt;
|
||||
use libp2p::core::upgrade;
|
||||
use libp2p::identify;
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use libp2p::ping;
|
||||
use libp2p::relay::v2::relay::{self, Relay};
|
||||
use libp2p::swarm::{Swarm, SwarmEvent};
|
||||
use libp2p::tcp::TcpTransport;
|
||||
use libp2p::tcp;
|
||||
use libp2p::Transport;
|
||||
use libp2p::{identity, NetworkBehaviour, PeerId};
|
||||
use libp2p::{noise, Multiaddr};
|
||||
use libp2p::{ping, Transport};
|
||||
use std::error::Error;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
@ -45,7 +46,7 @@ fn main() -> Result<(), Box<dyn Error>> {
|
||||
let local_peer_id = PeerId::from(local_key.public());
|
||||
println!("Local peer id: {:?}", local_peer_id);
|
||||
|
||||
let tcp_transport = TcpTransport::default();
|
||||
let tcp_transport = tcp::async_io::Transport::default();
|
||||
|
||||
let transport = tcp_transport
|
||||
.upgrade(upgrade::Version::V1)
|
||||
|
@ -25,14 +25,15 @@ use futures::{channel::mpsc, prelude::*, AsyncWriteExt};
|
||||
use libp2p::core::{
|
||||
identity,
|
||||
muxing::StreamMuxerBox,
|
||||
transport::{self, Transport},
|
||||
transport,
|
||||
upgrade::{self, read_length_prefixed, write_length_prefixed},
|
||||
Multiaddr, PeerId,
|
||||
};
|
||||
use libp2p::noise::NoiseAuthenticated;
|
||||
use libp2p::request_response::*;
|
||||
use libp2p::swarm::{Swarm, SwarmEvent};
|
||||
use libp2p::tcp::{GenTcpConfig, TcpTransport};
|
||||
use libp2p::tcp;
|
||||
use libp2p_core::Transport;
|
||||
use rand::{self, Rng};
|
||||
use std::{io, iter};
|
||||
|
||||
@ -298,7 +299,7 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
|
||||
|
||||
(
|
||||
peer_id,
|
||||
TcpTransport::new(GenTcpConfig::default().nodelay(true))
|
||||
tcp::async_io::Transport::new(tcp::Config::default().nodelay(true))
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(NoiseAuthenticated::xx(&id_keys).unwrap())
|
||||
.multiplex(libp2p::yamux::YamuxConfig::default())
|
||||
|
14
src/lib.rs
14
src/lib.rs
@ -183,13 +183,13 @@ pub async fn development_transport(
|
||||
keypair: identity::Keypair,
|
||||
) -> std::io::Result<core::transport::Boxed<(PeerId, core::muxing::StreamMuxerBox)>> {
|
||||
let transport = {
|
||||
let dns_tcp = dns::DnsConfig::system(tcp::TcpTransport::new(
|
||||
tcp::GenTcpConfig::new().nodelay(true),
|
||||
let dns_tcp = dns::DnsConfig::system(tcp::async_io::Transport::new(
|
||||
tcp::Config::new().nodelay(true),
|
||||
))
|
||||
.await?;
|
||||
let ws_dns_tcp = websocket::WsConfig::new(
|
||||
dns::DnsConfig::system(tcp::TcpTransport::new(
|
||||
tcp::GenTcpConfig::new().nodelay(true),
|
||||
dns::DnsConfig::system(tcp::async_io::Transport::new(
|
||||
tcp::Config::new().nodelay(true),
|
||||
))
|
||||
.await?,
|
||||
);
|
||||
@ -243,11 +243,11 @@ pub fn tokio_development_transport(
|
||||
keypair: identity::Keypair,
|
||||
) -> std::io::Result<core::transport::Boxed<(PeerId, core::muxing::StreamMuxerBox)>> {
|
||||
let transport = {
|
||||
let dns_tcp = dns::TokioDnsConfig::system(tcp::TokioTcpTransport::new(
|
||||
tcp::GenTcpConfig::new().nodelay(true),
|
||||
let dns_tcp = dns::TokioDnsConfig::system(tcp::tokio::Transport::new(
|
||||
tcp::Config::new().nodelay(true),
|
||||
))?;
|
||||
let ws_dns_tcp = websocket::WsConfig::new(dns::TokioDnsConfig::system(
|
||||
tcp::TokioTcpTransport::new(tcp::GenTcpConfig::new().nodelay(true)),
|
||||
tcp::tokio::Transport::new(tcp::Config::new().nodelay(true)),
|
||||
)?);
|
||||
dns_tcp.or_transport(ws_dns_tcp)
|
||||
};
|
||||
|
@ -21,7 +21,7 @@
|
||||
use futures::{future, prelude::*};
|
||||
use libp2p::core::{transport::Transport, upgrade};
|
||||
use libp2p::deflate::DeflateConfig;
|
||||
use libp2p::tcp::TcpTransport;
|
||||
use libp2p::tcp;
|
||||
use quickcheck::{QuickCheck, TestResult};
|
||||
use rand::RngCore;
|
||||
|
||||
@ -46,7 +46,7 @@ fn lot_of_data() {
|
||||
|
||||
async fn run(message1: Vec<u8>) {
|
||||
let new_transport = || {
|
||||
TcpTransport::default()
|
||||
tcp::async_io::Transport::default()
|
||||
.and_then(|conn, endpoint| {
|
||||
upgrade::apply(
|
||||
conn,
|
||||
|
@ -30,7 +30,7 @@ use libp2p::noise::{
|
||||
Keypair, NoiseAuthenticated, NoiseConfig, NoiseError, NoiseOutput, RemoteIdentity, X25519Spec,
|
||||
X25519,
|
||||
};
|
||||
use libp2p::tcp::TcpTransport;
|
||||
use libp2p::tcp;
|
||||
use log::info;
|
||||
use quickcheck::*;
|
||||
use std::{convert::TryInto, io, net::TcpStream};
|
||||
@ -41,7 +41,7 @@ fn core_upgrade_compat() {
|
||||
// i.e. if it compiles, the "test" is considered a success.
|
||||
let id_keys = identity::Keypair::generate_ed25519();
|
||||
let noise = NoiseAuthenticated::xx(&id_keys).unwrap();
|
||||
let _ = TcpTransport::default()
|
||||
let _ = tcp::async_io::Transport::default()
|
||||
.upgrade(upgrade::Version::V1)
|
||||
.authenticate(noise);
|
||||
}
|
||||
@ -60,7 +60,7 @@ fn xx_spec() {
|
||||
let server_dh = Keypair::<X25519Spec>::new()
|
||||
.into_authentic(&server_id)
|
||||
.unwrap();
|
||||
let server_transport = TcpTransport::default()
|
||||
let server_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(
|
||||
output,
|
||||
@ -75,7 +75,7 @@ fn xx_spec() {
|
||||
let client_dh = Keypair::<X25519Spec>::new()
|
||||
.into_authentic(&client_id)
|
||||
.unwrap();
|
||||
let client_transport = TcpTransport::default()
|
||||
let client_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(
|
||||
output,
|
||||
@ -107,7 +107,7 @@ fn xx() {
|
||||
let client_id_public = client_id.public();
|
||||
|
||||
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
|
||||
let server_transport = TcpTransport::default()
|
||||
let server_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(
|
||||
output,
|
||||
@ -120,7 +120,7 @@ fn xx() {
|
||||
.boxed();
|
||||
|
||||
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
|
||||
let client_transport = TcpTransport::default()
|
||||
let client_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(
|
||||
output,
|
||||
@ -152,7 +152,7 @@ fn ix() {
|
||||
let client_id_public = client_id.public();
|
||||
|
||||
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
|
||||
let server_transport = TcpTransport::default()
|
||||
let server_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(
|
||||
output,
|
||||
@ -165,7 +165,7 @@ fn ix() {
|
||||
.boxed();
|
||||
|
||||
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
|
||||
let client_transport = TcpTransport::default()
|
||||
let client_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
upgrade::apply(
|
||||
output,
|
||||
@ -198,7 +198,7 @@ fn ik_xx() {
|
||||
|
||||
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
|
||||
let server_dh_public = server_dh.public_dh_key().clone();
|
||||
let server_transport = TcpTransport::default()
|
||||
let server_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
if endpoint.is_listener() {
|
||||
Either::Left(apply_inbound(output, NoiseConfig::ik_listener(server_dh)))
|
||||
@ -215,7 +215,7 @@ fn ik_xx() {
|
||||
|
||||
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
|
||||
let server_id_public2 = server_id_public.clone();
|
||||
let client_transport = TcpTransport::default()
|
||||
let client_transport = tcp::async_io::Transport::default()
|
||||
.and_then(move |output, endpoint| {
|
||||
if endpoint.is_dialer() {
|
||||
Either::Left(apply_outbound(
|
||||
|
@ -1,3 +1,11 @@
|
||||
# 0.38.0 [unreleased]
|
||||
|
||||
- Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961].
|
||||
|
||||
- Remove `TcpListenStream` and `TcpListenerEvent` from public API. See [PR 2961].
|
||||
|
||||
[PR 2961]: https://github.com/libp2p/rust-libp2p/pull/2961
|
||||
|
||||
# 0.37.0
|
||||
|
||||
- Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration.
|
||||
|
@ -3,7 +3,7 @@ name = "libp2p-tcp"
|
||||
edition = "2021"
|
||||
rust-version = "1.56.1"
|
||||
description = "TCP/IP transport protocol for libp2p"
|
||||
version = "0.37.0"
|
||||
version = "0.38.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
@ -27,7 +27,7 @@ async-io = ["async-io-crate"]
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = { version = "1.6.5", features = ["attributes"] }
|
||||
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] }
|
||||
tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["full"] }
|
||||
env_logger = "0.9.0"
|
||||
|
||||
# Passing arguments to the docsrs builder in order to properly document cfg's.
|
||||
|
@ -18,43 +18,36 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Implementation of the libp2p `Transport` trait for TCP/IP.
|
||||
//! Implementation of the libp2p [`libp2p_core::Transport`] trait for TCP/IP.
|
||||
//!
|
||||
//! # Usage
|
||||
//!
|
||||
//! This crate provides a `TcpTransport` and `TokioTcpTransport`, depending on
|
||||
//! the enabled features, which implement the `Transport` trait for use as a
|
||||
//! This crate provides a [`async_io::Transport`] and [`tokio::Transport`], depending on
|
||||
//! the enabled features, which implement the [`libp2p_core::Transport`] trait for use as a
|
||||
//! transport with `libp2p-core` or `libp2p-swarm`.
|
||||
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
mod provider;
|
||||
|
||||
use if_watch::{IfEvent, IfWatcher};
|
||||
#[cfg(feature = "async-io")]
|
||||
pub use provider::async_io;
|
||||
|
||||
/// The type of a [`GenTcpTransport`] using the `async-io` implementation.
|
||||
#[cfg(feature = "async-io")]
|
||||
pub type TcpTransport = GenTcpTransport<async_io::Tcp>;
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use provider::tokio;
|
||||
|
||||
/// The type of a [`GenTcpTransport`] using the `tokio` implementation.
|
||||
#[cfg(feature = "tokio")]
|
||||
pub type TokioTcpTransport = GenTcpTransport<tokio::Tcp>;
|
||||
|
||||
use futures::{
|
||||
future::{self, Ready},
|
||||
prelude::*,
|
||||
};
|
||||
use futures_timer::Delay;
|
||||
use if_watch::{IfEvent, IfWatcher};
|
||||
use libp2p_core::{
|
||||
address_translation,
|
||||
multiaddr::{Multiaddr, Protocol},
|
||||
transport::{ListenerId, Transport, TransportError, TransportEvent},
|
||||
transport::{ListenerId, TransportError, TransportEvent},
|
||||
};
|
||||
use provider::{Incoming, Provider};
|
||||
use socket2::{Domain, Socket, Type};
|
||||
use std::{
|
||||
collections::{HashSet, VecDeque},
|
||||
@ -66,11 +59,9 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use provider::{Incoming, Provider};
|
||||
|
||||
/// The configuration for a TCP/IP transport capability for libp2p.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GenTcpConfig {
|
||||
pub struct Config {
|
||||
/// TTL to set for opened sockets, or `None` to keep default.
|
||||
ttl: Option<u32>,
|
||||
/// `TCP_NODELAY` to set for opened sockets, or `None` to keep default.
|
||||
@ -159,17 +150,17 @@ impl PortReuse {
|
||||
}
|
||||
}
|
||||
|
||||
impl GenTcpConfig {
|
||||
impl Config {
|
||||
/// Creates a new configuration for a TCP/IP transport:
|
||||
///
|
||||
/// * Nagle's algorithm, i.e. `TCP_NODELAY`, is _enabled_.
|
||||
/// See [`GenTcpConfig::nodelay`].
|
||||
/// See [`Config::nodelay`].
|
||||
/// * Reuse of listening ports is _disabled_.
|
||||
/// See [`GenTcpConfig::port_reuse`].
|
||||
/// See [`Config::port_reuse`].
|
||||
/// * No custom `IP_TTL` is set. The default of the OS TCP stack applies.
|
||||
/// See [`GenTcpConfig::ttl`].
|
||||
/// See [`Config::ttl`].
|
||||
/// * The size of the listen backlog for new listening sockets is `1024`.
|
||||
/// See [`GenTcpConfig::listen_backlog`].
|
||||
/// See [`Config::listen_backlog`].
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
ttl: None,
|
||||
@ -234,10 +225,10 @@ impl GenTcpConfig {
|
||||
/// > a single outgoing connection to a particular address and port
|
||||
/// > of a peer per local listening socket address.
|
||||
///
|
||||
/// [`GenTcpTransport`] keeps track of the listen socket addresses as they
|
||||
/// [`Transport`] keeps track of the listen socket addresses as they
|
||||
/// are reported by polling it. It is possible to listen on multiple
|
||||
/// addresses, enabling port reuse for each, knowing exactly which listen
|
||||
/// address is reused when dialing with a specific [`GenTcpTransport`], as in the
|
||||
/// address is reused when dialing with a specific [`Transport`], as in the
|
||||
/// following example:
|
||||
///
|
||||
/// ```no_run
|
||||
@ -251,12 +242,11 @@ impl GenTcpConfig {
|
||||
/// #[cfg(feature = "async-io")]
|
||||
/// #[async_std::main]
|
||||
/// async fn main() -> std::io::Result<()> {
|
||||
/// use libp2p_tcp::{GenTcpConfig, TcpTransport};
|
||||
///
|
||||
/// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap();
|
||||
/// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap();
|
||||
///
|
||||
/// let mut tcp1 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)).boxed();
|
||||
/// let mut tcp1 = libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::new().port_reuse(true)).boxed();
|
||||
/// tcp1.listen_on( listen_addr1.clone()).expect("listener");
|
||||
/// match tcp1.select_next_some().await {
|
||||
/// TransportEvent::NewAddress { listen_addr, .. } => {
|
||||
@ -267,7 +257,7 @@ impl GenTcpConfig {
|
||||
/// _ => {}
|
||||
/// }
|
||||
///
|
||||
/// let mut tcp2 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)).boxed();
|
||||
/// let mut tcp2 = libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::new().port_reuse(true)).boxed();
|
||||
/// tcp2.listen_on( listen_addr2).expect("listener");
|
||||
/// match tcp2.select_next_some().await {
|
||||
/// TransportEvent::NewAddress { listen_addr, .. } => {
|
||||
@ -286,7 +276,7 @@ impl GenTcpConfig {
|
||||
/// case, one is chosen whose IP protocol version and loopback status is the
|
||||
/// same as that of the remote address. Consequently, for maximum control of
|
||||
/// the local listening addresses and ports that are used for outgoing
|
||||
/// connections, a new [`GenTcpTransport`] should be created for each listening
|
||||
/// connections, a new [`Transport`] should be created for each listening
|
||||
/// socket, avoiding the use of wildcard addresses which bind a socket to
|
||||
/// all network interfaces.
|
||||
///
|
||||
@ -299,33 +289,48 @@ impl GenTcpConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for GenTcpConfig {
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GenTcpTransport<T>
|
||||
/// An abstract [`libp2p_core::Transport`] implementation.
|
||||
///
|
||||
/// You shouldn't need to use this type directly. Use one of the following instead:
|
||||
///
|
||||
/// - [`tokio::Transport`]
|
||||
/// - [`async_io::Transport`]
|
||||
pub struct Transport<T>
|
||||
where
|
||||
T: Provider + Send,
|
||||
{
|
||||
config: GenTcpConfig,
|
||||
config: Config,
|
||||
|
||||
/// The configuration of port reuse when dialing.
|
||||
port_reuse: PortReuse,
|
||||
/// All the active listeners.
|
||||
/// The `TcpListenStream` struct contains a stream that we want to be pinned. Since the `VecDeque`
|
||||
/// The [`TcpListenStream`] struct contains a stream that we want to be pinned. Since the `VecDeque`
|
||||
/// can be resized, the only way is to use a `Pin<Box<>>`.
|
||||
listeners: VecDeque<Pin<Box<TcpListenStream<T>>>>,
|
||||
/// Pending transport events to return from [`GenTcpTransport::poll`].
|
||||
pending_events: VecDeque<TransportEvent<<Self as Transport>::ListenerUpgrade, io::Error>>,
|
||||
/// Pending transport events to return from [`libp2p_core::Transport::poll`].
|
||||
pending_events:
|
||||
VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
|
||||
}
|
||||
|
||||
impl<T> GenTcpTransport<T>
|
||||
impl<T> Transport<T>
|
||||
where
|
||||
T: Provider + Send,
|
||||
{
|
||||
pub fn new(config: GenTcpConfig) -> Self {
|
||||
/// Create a new instance of [`Transport`].
|
||||
///
|
||||
/// If you don't want to specify a [`Config`], use [`Transport::default`].
|
||||
///
|
||||
/// It is best to call this function through one of the type-aliases of this type:
|
||||
///
|
||||
/// - [`tokio::Transport::new`]
|
||||
/// - [`async_io::Transport::new`]
|
||||
pub fn new(config: Config) -> Self {
|
||||
let port_reuse = if config.enable_port_reuse {
|
||||
PortReuse::Enabled {
|
||||
listen_addrs: Arc::new(RwLock::new(HashSet::new())),
|
||||
@ -333,7 +338,7 @@ where
|
||||
} else {
|
||||
PortReuse::Disabled
|
||||
};
|
||||
GenTcpTransport {
|
||||
Transport {
|
||||
config,
|
||||
port_reuse,
|
||||
..Default::default()
|
||||
@ -395,12 +400,15 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for GenTcpTransport<T>
|
||||
impl<T> Default for Transport<T>
|
||||
where
|
||||
T: Provider + Send,
|
||||
{
|
||||
/// Creates a [`Transport`] with reasonable defaults.
|
||||
///
|
||||
/// This transport will have port-reuse disabled.
|
||||
fn default() -> Self {
|
||||
let config = GenTcpConfig::default();
|
||||
let config = Config::default();
|
||||
let port_reuse = if config.enable_port_reuse {
|
||||
PortReuse::Enabled {
|
||||
listen_addrs: Arc::new(RwLock::new(HashSet::new())),
|
||||
@ -408,7 +416,7 @@ where
|
||||
} else {
|
||||
PortReuse::Disabled
|
||||
};
|
||||
GenTcpTransport {
|
||||
Transport {
|
||||
port_reuse,
|
||||
config,
|
||||
listeners: VecDeque::new(),
|
||||
@ -417,7 +425,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Transport for GenTcpTransport<T>
|
||||
impl<T> libp2p_core::Transport for Transport<T>
|
||||
where
|
||||
T: Provider + Send + 'static,
|
||||
T::Listener: Unpin,
|
||||
@ -609,7 +617,7 @@ where
|
||||
|
||||
/// Event produced by a [`TcpListenStream`].
|
||||
#[derive(Debug)]
|
||||
pub enum TcpListenerEvent<S> {
|
||||
enum TcpListenerEvent<S> {
|
||||
/// The listener is listening on a new additional [`Multiaddr`].
|
||||
NewAddress(Multiaddr),
|
||||
/// An upgrade, consisting of the upgrade future, the listener address and the remote address.
|
||||
@ -631,7 +639,7 @@ pub enum TcpListenerEvent<S> {
|
||||
}
|
||||
|
||||
/// A stream of incoming connections on one or more interfaces.
|
||||
pub struct TcpListenStream<T>
|
||||
struct TcpListenStream<T>
|
||||
where
|
||||
T: Provider,
|
||||
{
|
||||
@ -851,6 +859,20 @@ fn is_tcp_addr(addr: &Multiaddr) -> bool {
|
||||
matches!(first, Ip4(_) | Ip6(_) | Dns(_) | Dns4(_) | Dns6(_)) && matches!(second, Tcp(_))
|
||||
}
|
||||
|
||||
/// The configuration for a TCP/IP transport capability for libp2p.
|
||||
#[deprecated(since = "0.37.0", note = "Use `Config` instead.")]
|
||||
pub type GenTcpConfig = Config;
|
||||
|
||||
/// The type of a [`Transport`](libp2p_core::Transport) using the `async-io` implementation.
|
||||
#[cfg(feature = "async-io")]
|
||||
#[deprecated(since = "0.37.0", note = "Use `async_io::Transport` instead.")]
|
||||
pub type TcpTransport = Transport<async_io::Tcp>;
|
||||
|
||||
/// The type of a [`Transport`](libp2p_core::Transport) using the `tokio` implementation.
|
||||
#[cfg(feature = "tokio")]
|
||||
#[deprecated(since = "0.37.0", note = "Use `tokio::Transport` instead.")]
|
||||
pub type TokioTcpTransport = Transport<tokio::Tcp>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@ -859,6 +881,7 @@ mod tests {
|
||||
future::poll_fn,
|
||||
};
|
||||
use libp2p_core::PeerId;
|
||||
use libp2p_core::Transport as _;
|
||||
|
||||
#[test]
|
||||
fn multiaddr_to_tcp_conversion() {
|
||||
@ -914,7 +937,7 @@ mod tests {
|
||||
env_logger::try_init().ok();
|
||||
|
||||
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
|
||||
let mut tcp = Transport::<T>::default().boxed();
|
||||
tcp.listen_on(addr).unwrap();
|
||||
loop {
|
||||
match tcp.select_next_some().await {
|
||||
@ -936,7 +959,7 @@ mod tests {
|
||||
|
||||
async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
|
||||
let addr = ready_rx.next().await.unwrap();
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
|
||||
let mut tcp = Transport::<T>::default();
|
||||
|
||||
// Obtain a future socket through dialing
|
||||
let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
|
||||
@ -983,7 +1006,7 @@ mod tests {
|
||||
env_logger::try_init().ok();
|
||||
|
||||
async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
|
||||
let mut tcp = Transport::<T>::default().boxed();
|
||||
tcp.listen_on(addr).unwrap();
|
||||
|
||||
loop {
|
||||
@ -1012,7 +1035,7 @@ mod tests {
|
||||
|
||||
async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
|
||||
let dest_addr = ready_rx.next().await.unwrap();
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new());
|
||||
let mut tcp = Transport::<T>::default();
|
||||
tcp.dial(dest_addr).unwrap().await.unwrap();
|
||||
}
|
||||
|
||||
@ -1056,7 +1079,7 @@ mod tests {
|
||||
mut ready_tx: mpsc::Sender<Multiaddr>,
|
||||
port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
|
||||
) {
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
|
||||
let mut tcp = Transport::<T>::new(Config::new()).boxed();
|
||||
tcp.listen_on(addr).unwrap();
|
||||
loop {
|
||||
match tcp.select_next_some().await {
|
||||
@ -1091,7 +1114,7 @@ mod tests {
|
||||
port_reuse_tx: oneshot::Sender<Protocol<'_>>,
|
||||
) {
|
||||
let dest_addr = ready_rx.next().await.unwrap();
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true));
|
||||
let mut tcp = Transport::<T>::new(Config::new().port_reuse(true));
|
||||
tcp.listen_on(addr).unwrap();
|
||||
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
|
||||
TransportEvent::NewAddress { .. } => {
|
||||
@ -1159,7 +1182,7 @@ mod tests {
|
||||
env_logger::try_init().ok();
|
||||
|
||||
async fn listen_twice<T: Provider>(addr: Multiaddr) {
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new().port_reuse(true));
|
||||
let mut tcp = Transport::<T>::new(Config::new().port_reuse(true));
|
||||
tcp.listen_on(addr).unwrap();
|
||||
match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
|
||||
TransportEvent::NewAddress {
|
||||
@ -1213,7 +1236,7 @@ mod tests {
|
||||
env_logger::try_init().ok();
|
||||
|
||||
async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
|
||||
let mut tcp = GenTcpTransport::<T>::new(GenTcpConfig::new()).boxed();
|
||||
let mut tcp = Transport::<T>::default().boxed();
|
||||
tcp.listen_on(addr).unwrap();
|
||||
tcp.select_next_some()
|
||||
.await
|
||||
@ -1250,13 +1273,13 @@ mod tests {
|
||||
fn test(addr: Multiaddr) {
|
||||
#[cfg(feature = "async-io")]
|
||||
{
|
||||
let mut tcp = TcpTransport::new(GenTcpConfig::new());
|
||||
let mut tcp = async_io::Transport::default();
|
||||
assert!(tcp.listen_on(addr.clone()).is_err());
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
{
|
||||
let mut tcp = TokioTcpTransport::new(GenTcpConfig::new());
|
||||
let mut tcp = tokio::Transport::default();
|
||||
assert!(tcp.listen_on(addr).is_err());
|
||||
}
|
||||
}
|
||||
@ -1264,13 +1287,23 @@ mod tests {
|
||||
test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "async-io", feature = "tcp"))]
|
||||
#[test]
|
||||
fn test_address_translation() {
|
||||
#[cfg(feature = "async-io")]
|
||||
let transport = TcpTransport::new(GenTcpConfig::new());
|
||||
#[cfg(all(feature = "tokio", not(feature = "async-io")))]
|
||||
let transport = TokioTcpTransport::new(GenTcpConfig::new());
|
||||
#[test]
|
||||
fn test_address_translation_async_io() {
|
||||
test_address_translation::<async_io::Transport>()
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
#[test]
|
||||
fn test_address_translation_tokio() {
|
||||
test_address_translation::<tokio::Transport>()
|
||||
}
|
||||
|
||||
fn test_address_translation<T>()
|
||||
where
|
||||
T: Default + libp2p_core::Transport,
|
||||
{
|
||||
let transport = T::default();
|
||||
|
||||
let port = 42;
|
||||
let tcp_listen_addr = Multiaddr::empty()
|
||||
|
@ -26,7 +26,30 @@ use std::io;
|
||||
use std::net;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// A TCP [`Transport`](libp2p_core::Transport) that works with the `async-std` ecosystem.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// # use libp2p_tcp as tcp;
|
||||
/// # use libp2p_core::Transport;
|
||||
/// # use futures::future;
|
||||
/// # use std::pin::Pin;
|
||||
/// #
|
||||
/// # #[async_std::main]
|
||||
/// # async fn main() {
|
||||
/// let mut transport = tcp::async_io::Transport::new(tcp::Config::default());
|
||||
/// let id = transport.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
|
||||
///
|
||||
/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap();
|
||||
///
|
||||
/// println!("Listening on {addr}");
|
||||
/// # }
|
||||
/// ```
|
||||
pub type Transport = crate::Transport<Tcp>;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
#[doc(hidden)]
|
||||
pub enum Tcp {}
|
||||
|
||||
impl Provider for Tcp {
|
||||
|
@ -30,7 +30,31 @@ use std::net;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// A TCP [`Transport`](libp2p_core::Transport) that works with the `tokio` ecosystem.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// # use libp2p_tcp as tcp;
|
||||
/// # use libp2p_core::Transport;
|
||||
/// # use futures::future;
|
||||
/// # use std::pin::Pin;
|
||||
/// # use tokio_crate as tokio;
|
||||
/// #
|
||||
/// # #[tokio::main]
|
||||
/// # async fn main() {
|
||||
/// let mut transport = tcp::tokio::Transport::new(tcp::Config::default());
|
||||
/// let id = transport.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
|
||||
///
|
||||
/// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap();
|
||||
///
|
||||
/// println!("Listening on {addr}");
|
||||
/// # }
|
||||
/// ```
|
||||
pub type Transport = crate::Transport<Tcp>;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
#[doc(hidden)]
|
||||
pub enum Tcp {}
|
||||
|
||||
impl Provider for Tcp {
|
||||
|
@ -236,8 +236,8 @@ mod tests {
|
||||
futures::executor::block_on(connect(a))
|
||||
}
|
||||
|
||||
fn new_ws_config() -> WsConfig<tcp::TcpTransport> {
|
||||
WsConfig::new(tcp::TcpTransport::new(tcp::GenTcpConfig::default()))
|
||||
fn new_ws_config() -> WsConfig<tcp::async_io::Transport> {
|
||||
WsConfig::new(tcp::async_io::Transport::new(tcp::Config::default()))
|
||||
}
|
||||
|
||||
async fn connect(listen_addr: Multiaddr) {
|
||||
|
Reference in New Issue
Block a user