From d7363a53d3a3c1abdb53d92f96f1a931d38e1db4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 13 Dec 2022 07:58:01 +1100 Subject: [PATCH] fix: Remove circular dependencies across workspace (#3023) Circular dependencies are problematic in several ways: - They result in cognitive overhead for developers, in trying to figure out what depends on what. - They present `cargo` with limits in what order the crates can be compiled in. - They invalidate build caches unnecessarily thus forcing `cargo` to rebuild certain crates. - They cause problems with tooling such as `release-please`. To actually break the circular dependencies, this patch inlines the uses of `development_transport` in the examples and tests for all sub-crates. This is only meant to be a short-term fix until https://github.com/libp2p/rust-libp2p/issues/3111 and https://github.com/libp2p/rust-libp2p/pull/2888 are fixed. To ensure we don't accidentally reintroduce this dependency, we add a basic CI that queries `cargo metadata` using `jq`. Resolves https://github.com/libp2p/rust-libp2p/issues/3053. Fixes https://github.com/libp2p/rust-libp2p/issues/3223. Related: https://github.com/libp2p/rust-libp2p/pull/2918#discussion_r976514245 Related: https://github.com/googleapis/release-please/issues/1662 --- .github/workflows/ci.yml | 5 + Cargo.toml | 2 +- core/Cargo.toml | 3 +- core/tests/transport_upgrade.rs | 10 +- misc/metrics/Cargo.toml | 12 +- misc/metrics/examples/metrics/main.rs | 25 +- misc/multistream-select/Cargo.toml | 5 +- misc/multistream-select/tests/transport.rs | 8 +- muxers/mplex/Cargo.toml | 3 +- muxers/mplex/benches/split_send_size.rs | 10 +- protocols/autonat/Cargo.toml | 8 +- protocols/autonat/examples/autonat_client.rs | 21 +- protocols/autonat/examples/autonat_server.rs | 20 +- protocols/autonat/tests/test_client.rs | 17 +- protocols/autonat/tests/test_server.rs | 24 +- protocols/dcutr/Cargo.toml | 14 +- protocols/dcutr/examples/dcutr.rs | 33 +- protocols/dcutr/tests/lib.rs | 30 +- protocols/gossipsub/Cargo.toml | 12 +- protocols/gossipsub/src/lib.rs | 4 +- protocols/gossipsub/tests/smoke.rs | 10 +- protocols/identify/Cargo.toml | 6 +- protocols/identify/examples/identify.rs | 10 +- protocols/identify/src/behaviour.rs | 6 +- protocols/identify/src/lib.rs | 8 +- protocols/identify/src/protocol.rs | 2 +- protocols/kad/Cargo.toml | 3 +- protocols/kad/src/behaviour/test.rs | 6 +- protocols/mdns/Cargo.toml | 6 +- protocols/mdns/src/behaviour/timer.rs | 15 +- protocols/mdns/tests/use-async-std.rs | 16 +- protocols/mdns/tests/use-tokio.rs | 16 +- protocols/ping/Cargo.toml | 6 +- protocols/ping/tests/ping.rs | 15 +- protocols/relay/Cargo.toml | 12 +- protocols/relay/examples/relay_v2.rs | 27 +- protocols/relay/tests/v2.rs | 36 +- protocols/rendezvous/Cargo.toml | 8 +- protocols/rendezvous/examples/discover.rs | 26 +- protocols/rendezvous/examples/register.rs | 23 +- .../examples/register_with_identify.rs | 25 +- .../rendezvous/examples/rendezvous_point.rs | 24 +- protocols/rendezvous/tests/harness.rs | 22 +- protocols/rendezvous/tests/rendezvous.rs | 14 +- protocols/request-response/Cargo.toml | 4 +- protocols/request-response/tests/ping.rs | 15 +- src/tutorials/ping.rs | 9 +- transports/deflate/Cargo.toml | 2 +- transports/deflate/tests/test.rs | 6 +- transports/dns/src/lib.rs | 2 +- transports/noise/Cargo.toml | 6 +- transports/noise/src/lib.rs | 4 +- transports/noise/tests/smoke.rs | 14 +- transports/quic/Cargo.toml | 4 +- transports/quic/tests/smoke.rs | 16 +- transports/tls/Cargo.toml | 4 +- transports/tls/tests/smoke.rs | 9 +- transports/webrtc/Cargo.toml | 4 +- transports/webrtc/examples/listen_ping.rs | 12 +- transports/webrtc/tests/smoke.rs | 722 ++++++++---------- transports/websocket/Cargo.toml | 4 +- transports/websocket/src/lib.rs | 4 +- 62 files changed, 746 insertions(+), 703 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0244c45e..5d9bdb2a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,6 +75,11 @@ jobs: cargo install cargo-semver-checks --locked cargo semver-checks check-release -p ${{ matrix.crate }} + - name: Enforce no dependency on meta crate + run: | + cargo metadata --format-version=1 --no-deps | \ + jq -e -r '.packages[] | select(.name == "${{ matrix.crate }}") | .dependencies | all(.name != "libp2p")' + cross: name: Compile on ${{ matrix.target }} strategy: diff --git a/Cargo.toml b/Cargo.toml index b5bc3894..eaed637a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,6 @@ identify = ["dep:libp2p-identify", "libp2p-metrics?/identify"] kad = ["dep:libp2p-kad", "libp2p-metrics?/kad"] macros = ["libp2p-swarm/macros"] mdns = ["dep:libp2p-mdns"] -tls = ["dep:libp2p-tls"] metrics = ["dep:libp2p-metrics"] mplex = ["dep:libp2p-mplex"] noise = ["dep:libp2p-noise"] @@ -76,6 +75,7 @@ rsa = ["libp2p-core/rsa"] secp256k1 = ["libp2p-core/secp256k1"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] tcp = ["dep:libp2p-tcp"] +tls = ["dep:libp2p-tls"] tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-webrtc?/tokio"] uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "libp2p-swarm/wasm-bindgen"] diff --git a/core/Cargo.toml b/core/Cargo.toml index cb95d814..6bac1bc6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -47,7 +47,8 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal async-std = { version = "1.6.2", features = ["attributes"] } base64 = "0.13.0" criterion = "0.4" -libp2p = { path = "..", features = ["full"] } +libp2p-mplex = { path = "../muxers/mplex" } +libp2p-noise = { path = "../transports/noise" } multihash = { version = "0.16", default-features = false, features = ["arb"] } quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" } rmp-serde = "1.0" diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index 9775e50a..dac84534 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -19,11 +19,11 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p::core::identity; -use libp2p::core::transport::{MemoryTransport, Transport}; -use libp2p::core::upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p::mplex::MplexConfig; -use libp2p::noise; +use libp2p_core::identity; +use libp2p_core::transport::{MemoryTransport, Transport}; +use libp2p_core::upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_mplex::MplexConfig; +use libp2p_noise as noise; use multiaddr::{Multiaddr, Protocol}; use rand::random; use std::{io, pin::Pin}; diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 6a4e3e4d..f1dc7526 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -32,11 +32,15 @@ prometheus-client = "0.18.0" libp2p-gossipsub = { version = "0.44.0", path = "../../protocols/gossipsub", optional = true } [dev-dependencies] -log = "0.4.0" env_logger = "0.10.0" futures = "0.3.1" -libp2p = { path = "../..", features = ["full"] } hyper = { version="0.14", features = ["server", "tcp", "http1"] } +libp2p-noise = { path = "../../transports/noise" } +libp2p-ping = { path = "../../protocols/ping" } +libp2p-swarm = { path = "../../swarm", features = ["macros"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } +log = "0.4.0" tokio = { version = "1", features = ["rt-multi-thread"] } # Passing arguments to the docsrs builder in order to properly document cfg's. @@ -45,3 +49,7 @@ tokio = { version = "1", features = ["rt-multi-thread"] } all-features = true rustdoc-args = ["--cfg", "docsrs"] rustc-args = ["--cfg", "docsrs"] + +[[example]] +name = "metrics" +required-features = ["ping"] diff --git a/misc/metrics/examples/metrics/main.rs b/misc/metrics/examples/metrics/main.rs index 687dba3f..22c3bd2f 100644 --- a/misc/metrics/examples/metrics/main.rs +++ b/misc/metrics/examples/metrics/main.rs @@ -51,11 +51,17 @@ use env_logger::Env; use futures::executor::block_on; use futures::stream::StreamExt; -use libp2p::core::Multiaddr; -use libp2p::metrics::{Metrics, Recorder}; -use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; -use libp2p::{identify, identity, ping, PeerId, Swarm}; +use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_identify as identify; +use libp2p_metrics::{Metrics, Recorder}; +use libp2p_noise as noise; +use libp2p_ping as ping; use libp2p_swarm::keep_alive; +use libp2p_swarm::NetworkBehaviour; +use libp2p_swarm::Swarm; +use libp2p_swarm::SwarmEvent; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use log::info; use prometheus_client::registry::Registry; use std::error::Error; @@ -69,10 +75,14 @@ fn main() -> Result<(), Box> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); let local_pub_key = local_key.public(); - info!("Local peer id: {:?}", local_peer_id); + info!("Local peer id: {local_peer_id:?}"); let mut swarm = Swarm::without_executor( - block_on(libp2p::development_transport(local_key))?, + tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(noise::NoiseAuthenticated::xx(&local_key)?) + .multiplex(yamux::YamuxConfig::default()) + .boxed(), Behaviour::new(local_pub_key), local_peer_id, ); @@ -115,6 +125,7 @@ fn main() -> Result<(), Box> { /// For illustrative purposes, this includes the [`keep_alive::Behaviour`]) behaviour so the ping actually happen /// and can be observed via the metrics. #[derive(NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { identify: identify::Behaviour, keep_alive: keep_alive::Behaviour, @@ -122,7 +133,7 @@ struct Behaviour { } impl Behaviour { - fn new(local_pub_key: libp2p::identity::PublicKey) -> Self { + fn new(local_pub_key: identity::PublicKey) -> Self { Self { ping: ping::Behaviour::default(), identify: identify::Behaviour::new(identify::Config::new( diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index b0f5ef37..ef24c98e 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -21,7 +21,10 @@ unsigned-varint = "0.7" [dev-dependencies] async-std = "1.6.2" env_logger = "0.10" -libp2p = { path = "../..", features = ["full"] } +libp2p-core = { path = "../../core" } +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-swarm = { path = "../../swarm", features = ["async-std"] } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } rand = "0.8" rw-stream-sink = { version = "0.3.0", path = "../../misc/rw-stream-sink" } diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index a66d20ea..77fcc1ca 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -19,16 +19,16 @@ // DEALINGS IN THE SOFTWARE. use futures::{channel::oneshot, prelude::*, ready}; -use libp2p::core::{ +use libp2p_core::{ identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::{self, MemoryTransport}, upgrade, Multiaddr, PeerId, Transport, }; -use libp2p::mplex::MplexConfig; -use libp2p::plaintext::PlainText2Config; -use libp2p::swarm::{dummy, Swarm, SwarmEvent}; +use libp2p_mplex::MplexConfig; +use libp2p_plaintext::PlainText2Config; +use libp2p_swarm::{dummy, Swarm, SwarmEvent}; use rand::random; use std::task::Poll; diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 5c19db55..86adc614 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -27,8 +27,9 @@ async-std = { version = "1.7.0", features = ["attributes"] } criterion = "0.4" env_logger = "0.10" futures = "0.3" -libp2p = { path = "../..", features = ["full"] } libp2p-muxer-test-harness = { path = "../test-harness" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } [[bench]] diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index 93172ca6..80eb09c5 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -26,12 +26,12 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughpu use futures::future::poll_fn; use futures::prelude::*; use futures::{channel::oneshot, future::join}; -use libp2p::core::muxing::StreamMuxerExt; -use libp2p::core::{ +use libp2p_core::muxing::StreamMuxerExt; +use libp2p_core::{ identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, Transport, }; -use libp2p::plaintext::PlainText2Config; -use libp2p::{mplex, tcp}; +use libp2p_mplex as mplex; +use libp2p_plaintext::PlainText2Config; use std::pin::Pin; use std::time::Duration; @@ -169,7 +169,7 @@ fn tcp_transport(split_send_size: usize) -> BenchTransport { let mut mplex = mplex::MplexConfig::default(); mplex.set_split_send_size(split_send_size); - tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) + libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().nodelay(true)) .upgrade(upgrade::Version::V1) .authenticate(PlainText2Config { local_public_key }) .multiplex(mplex) diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index eb4d12b0..6fbad657 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -27,9 +27,13 @@ prost = "0.11" [dev-dependencies] async-std = { version = "1.10", features = ["attributes"] } -env_logger = "0.10" clap = { version = "4.0.13", features = ["derive"] } -libp2p = { path = "../..", features = ["full"] } +env_logger = "0.10" +libp2p-identify = { path = "../identify" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-swarm = { path = "../../swarm", features = ["async-std", "macros"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/autonat/examples/autonat_client.rs b/protocols/autonat/examples/autonat_client.rs index 1c897620..c124dcf9 100644 --- a/protocols/autonat/examples/autonat_client.rs +++ b/protocols/autonat/examples/autonat_client.rs @@ -31,11 +31,14 @@ use clap::Parser; use futures::prelude::*; -use libp2p::autonat; -use libp2p::identify; -use libp2p::multiaddr::Protocol; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{identity, Multiaddr, PeerId}; +use libp2p_autonat as autonat; +use libp2p_core::multiaddr::Protocol; +use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_identify as identify; +use libp2p_noise as noise; +use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use std::error::Error; use std::net::Ipv4Addr; use std::time::Duration; @@ -63,7 +66,11 @@ async fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {:?}", local_peer_id); - let transport = libp2p::development_transport(local_key.clone()).await?; + let transport = tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(noise::NoiseAuthenticated::xx(&local_key)?) + .multiplex(yamux::YamuxConfig::default()) + .boxed(); let behaviour = Behaviour::new(local_key.public()); @@ -89,7 +96,7 @@ async fn main() -> Result<(), Box> { } #[derive(NetworkBehaviour)] -#[behaviour(out_event = "Event")] +#[behaviour(out_event = "Event", prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { identify: identify::Behaviour, auto_nat: autonat::Behaviour, diff --git a/protocols/autonat/examples/autonat_server.rs b/protocols/autonat/examples/autonat_server.rs index a3bcda1e..a5e98012 100644 --- a/protocols/autonat/examples/autonat_server.rs +++ b/protocols/autonat/examples/autonat_server.rs @@ -28,11 +28,13 @@ use clap::Parser; use futures::prelude::*; -use libp2p::autonat; -use libp2p::identify; -use libp2p::multiaddr::Protocol; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{identity, Multiaddr, PeerId}; +use libp2p_autonat as autonat; +use libp2p_core::{identity, multiaddr::Protocol, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_identify as identify; +use libp2p_noise as noise; +use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use std::error::Error; use std::net::Ipv4Addr; @@ -53,7 +55,11 @@ async fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {:?}", local_peer_id); - let transport = libp2p::development_transport(local_key.clone()).await?; + let transport = tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(noise::NoiseAuthenticated::xx(&local_key)?) + .multiplex(yamux::YamuxConfig::default()) + .boxed(); let behaviour = Behaviour::new(local_key.public()); @@ -74,7 +80,7 @@ async fn main() -> Result<(), Box> { } #[derive(NetworkBehaviour)] -#[behaviour(out_event = "Event")] +#[behaviour(out_event = "Event", prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { identify: identify::Behaviour, auto_nat: autonat::Behaviour, diff --git a/protocols/autonat/tests/test_client.rs b/protocols/autonat/tests/test_client.rs index 5e23304e..1fd03925 100644 --- a/protocols/autonat/tests/test_client.rs +++ b/protocols/autonat/tests/test_client.rs @@ -20,15 +20,14 @@ use futures::{channel::oneshot, Future, FutureExt, StreamExt}; use futures_timer::Delay; -use libp2p::{ - development_transport, - identity::Keypair, - swarm::{AddressScore, Swarm, SwarmEvent}, - Multiaddr, PeerId, -}; use libp2p_autonat::{ Behaviour, Config, Event, NatStatus, OutboundProbeError, OutboundProbeEvent, ResponseError, }; +use libp2p_core::{identity::Keypair, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_noise as noise; +use libp2p_swarm::{AddressScore, Swarm, SwarmEvent}; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use std::time::Duration; const MAX_CONFIDENCE: usize = 3; @@ -38,7 +37,11 @@ const TEST_REFRESH_INTERVAL: Duration = Duration::from_secs(2); async fn init_swarm(config: Config) -> Swarm { let keypair = Keypair::generate_ed25519(); let local_id = PeerId::from_public_key(&keypair.public()); - let transport = development_transport(keypair).await.unwrap(); + let transport = tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap()) + .multiplex(yamux::YamuxConfig::default()) + .boxed(); let behaviour = Behaviour::new(local_id, config); Swarm::with_async_std_executor(transport, behaviour, local_id) } diff --git a/protocols/autonat/tests/test_server.rs b/protocols/autonat/tests/test_server.rs index 57d20ef2..140bb928 100644 --- a/protocols/autonat/tests/test_server.rs +++ b/protocols/autonat/tests/test_server.rs @@ -20,24 +20,28 @@ use futures::{channel::oneshot, Future, FutureExt, StreamExt}; use futures_timer::Delay; -use libp2p::core::{ConnectedPoint, Endpoint}; -use libp2p::swarm::DialError; -use libp2p::{ - development_transport, - identity::Keypair, - multiaddr::Protocol, - swarm::{AddressScore, Swarm, SwarmEvent}, - Multiaddr, PeerId, -}; use libp2p_autonat::{ Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, ResponseError, }; +use libp2p_core::{ + identity::Keypair, multiaddr::Protocol, upgrade::Version, ConnectedPoint, Endpoint, Multiaddr, + PeerId, Transport, +}; +use libp2p_noise as noise; +use libp2p_swarm::DialError; +use libp2p_swarm::{AddressScore, Swarm, SwarmEvent}; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use std::{num::NonZeroU32, time::Duration}; async fn init_swarm(config: Config) -> Swarm { let keypair = Keypair::generate_ed25519(); let local_id = PeerId::from_public_key(&keypair.public()); - let transport = development_transport(keypair).await.unwrap(); + let transport = tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap()) + .multiplex(yamux::YamuxConfig::default()) + .boxed(); let behaviour = Behaviour::new(local_id, config); Swarm::with_async_std_executor(transport, behaviour, local_id) } diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index e3099ac1..92d932a9 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -29,10 +29,18 @@ void = "1" prost-build = "0.11" [dev-dependencies] -env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } -rand = "0.8" clap = { version = "4.0.13", features = ["derive"] } +env_logger = "0.10.0" +libp2p-dns = { path = "../../transports/dns", features = ["async-std"] } +libp2p-identify = { path = "../../protocols/identify" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-ping = { path = "../../protocols/ping" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-relay = { path = "../relay" } +libp2p-swarm = { path = "../../swarm", features = ["macros"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } +rand = "0.8" # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index e12ff4cc..f543db25 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -22,18 +22,19 @@ use clap::Parser; use futures::executor::{block_on, ThreadPool}; use futures::future::FutureExt; use futures::stream::StreamExt; -use libp2p::core::multiaddr::{Multiaddr, Protocol}; -use libp2p::core::transport::OrTransport; -use libp2p::core::upgrade; -use libp2p::dns::DnsConfig; -use libp2p::identify; -use libp2p::noise; -use libp2p::relay::v2::client::{self, Client}; -use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; -use libp2p::tcp; -use libp2p::Transport; -use libp2p::{dcutr, ping}; -use libp2p::{identity, PeerId}; +use libp2p_core::multiaddr::{Multiaddr, Protocol}; +use libp2p_core::transport::OrTransport; +use libp2p_core::upgrade; +use libp2p_core::Transport; +use libp2p_core::{identity, PeerId}; +use libp2p_dcutr as dcutr; +use libp2p_dns::DnsConfig; +use libp2p_identify as identify; +use libp2p_noise as noise; +use libp2p_ping as ping; +use libp2p_relay::v2::client::{self, Client}; +use libp2p_swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; +use libp2p_tcp as tcp; use log::info; use std::convert::TryInto; use std::error::Error; @@ -100,11 +101,15 @@ fn main() -> Result<(), Box> { noise::NoiseAuthenticated::xx(&local_key) .expect("Signing libp2p-noise static DH keypair failed."), ) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); #[derive(NetworkBehaviour)] - #[behaviour(out_event = "Event", event_process = false)] + #[behaviour( + out_event = "Event", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" + )] struct Behaviour { relay_client: Client, ping: ping::Behaviour, diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 8c687835..ac949f0e 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -23,17 +23,17 @@ use futures::future::FutureExt; use futures::io::{AsyncRead, AsyncWrite}; use futures::stream::StreamExt; use futures::task::Spawn; -use libp2p::core::multiaddr::{Multiaddr, Protocol}; -use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::transport::upgrade::Version; -use libp2p::core::transport::{Boxed, MemoryTransport, OrTransport, Transport}; -use libp2p::core::PublicKey; -use libp2p::core::{identity, PeerId}; -use libp2p::dcutr; -use libp2p::plaintext::PlainText2Config; -use libp2p::relay::v2::client; -use libp2p::relay::v2::relay; -use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_core::multiaddr::{Multiaddr, Protocol}; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::upgrade::Version; +use libp2p_core::transport::{Boxed, MemoryTransport, OrTransport, Transport}; +use libp2p_core::PublicKey; +use libp2p_core::{identity, PeerId}; +use libp2p_dcutr as dcutr; +use libp2p_plaintext::PlainText2Config; +use libp2p_relay::v2::client; +use libp2p_relay::v2::relay; +use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use std::time::Duration; #[test] @@ -142,12 +142,16 @@ where transport .upgrade(Version::V1) .authenticate(PlainText2Config { local_public_key }) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed() } #[derive(NetworkBehaviour)] -#[behaviour(out_event = "ClientEvent", event_process = false)] +#[behaviour( + out_event = "ClientEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct Client { relay: client::Client, dcutr: dcutr::behaviour::Behaviour, diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 08ae497c..62ba9df9 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -37,11 +37,15 @@ prometheus-client = "0.18.0" [dev-dependencies] async-std = "1.6.3" -env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } -quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } -hex = "0.4.2" derive_builder = "0.11.1" +env_logger = "0.10.0" +hex = "0.4.2" +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-swarm = { path = "../../swarm" } +libp2p-yamux = { path = "../../muxers/yamux" } +quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } [build-dependencies] prost-build = "0.11" diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index faf0b050..bd520aa9 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -101,8 +101,8 @@ //! // This is test transport (memory). //! let transport = MemoryTransport::default() //! .upgrade(libp2p_core::upgrade::Version::V1) -//! .authenticate(libp2p::noise::NoiseAuthenticated::xx(&local_key).unwrap()) -//! .multiplex(libp2p::mplex::MplexConfig::new()) +//! .authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap()) +//! .multiplex(libp2p_mplex::MplexConfig::new()) //! .boxed(); //! //! // Create a Gossipsub topic diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index db99179b..b89a37a2 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -29,16 +29,16 @@ use std::{ }; use futures::StreamExt; -use libp2p::core::{ +use libp2p_core::{ identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport, }; -use libp2p::gossipsub::{ +use libp2p_gossipsub::{ Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, MessageAuthenticity, ValidationMode, }; -use libp2p::plaintext::PlainText2Config; -use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::yamux; +use libp2p_plaintext::PlainText2Config; +use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p_yamux as yamux; struct Graph { pub nodes: Vec<(Multiaddr, Swarm)>, diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 477822d8..c28e7b20 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -27,7 +27,11 @@ void = "1.0" [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.10" -libp2p = { path = "../..", features = ["full"] } +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-yamux = { path = "../../muxers/yamux" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-swarm = { path = "../../swarm", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } [build-dependencies] prost-build = "0.11" diff --git a/protocols/identify/examples/identify.rs b/protocols/identify/examples/identify.rs index 6f5fb2a1..b7962b40 100644 --- a/protocols/identify/examples/identify.rs +++ b/protocols/identify/examples/identify.rs @@ -37,7 +37,9 @@ //! and will send each other identify info which is then printed to the console. use futures::prelude::*; -use libp2p::{identify, identity, Multiaddr, PeerId}; +use libp2p_core::upgrade::Version; +use libp2p_core::{identity, Multiaddr, PeerId, Transport}; +use libp2p_identify as identify; use libp2p_swarm::{Swarm, SwarmEvent}; use std::error::Error; @@ -47,7 +49,11 @@ async fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {:?}", local_peer_id); - let transport = libp2p::development_transport(local_key.clone()).await?; + let transport = libp2p_tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); // Create a identify network behaviour. let behaviour = identify::Behaviour::new(identify::Config::new( diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 10bfab3e..0e53470f 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -557,11 +557,11 @@ impl PeerCache { mod tests { use super::*; use futures::pin_mut; - use libp2p::mplex::MplexConfig; - use libp2p::noise; - use libp2p::tcp; use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport}; + use libp2p_mplex::MplexConfig; + use libp2p_noise as noise; use libp2p_swarm::{Swarm, SwarmEvent}; + use libp2p_tcp as tcp; use std::time::Duration; fn transport() -> ( diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index e4748eba..0155e1b6 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -46,22 +46,22 @@ pub use self::protocol::{Info, UpgradeError, PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; #[deprecated( since = "0.40.0", - note = "Use re-exports that omit `Identify` prefix, i.e. `libp2p::identify::Config`" + note = "Use re-exports that omit `Identify` prefix, i.e. `libp2p_identify::Config`" )] pub type IdentifyConfig = Config; #[deprecated( since = "0.40.0", - note = "Use re-exports that omit `Identify` prefix, i.e. `libp2p::identify::Event`" + note = "Use re-exports that omit `Identify` prefix, i.e. `libp2p_identify::Event`" )] pub type IdentifyEvent = Event; -#[deprecated(since = "0.40.0", note = "Use libp2p::identify::Behaviour instead.")] +#[deprecated(since = "0.40.0", note = "Use libp2p_identify::Behaviour instead.")] pub type Identify = Behaviour; #[deprecated( since = "0.40.0", - note = "Use re-exports that omit `Identify` prefix, i.e. `libp2p::identify::Info`" + note = "Use re-exports that omit `Identify` prefix, i.e. `libp2p_identify::Info`" )] pub type IdentifyInfo = Info; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 1a969c6a..f3062256 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -275,12 +275,12 @@ pub enum UpgradeError { mod tests { use super::*; use futures::channel::oneshot; - use libp2p::tcp; use libp2p_core::{ identity, upgrade::{self, apply_inbound, apply_outbound}, Transport, }; + use libp2p_tcp as tcp; #[test] fn correct_transfer() { diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 8afeea44..bf7a456d 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -35,7 +35,8 @@ thiserror = "1" [dev-dependencies] env_logger = "0.10.0" futures-timer = "3.0" -libp2p = { path = "../..", features = ["full"] } +libp2p-noise = { path = "../../transports/noise" } +libp2p-yamux = { path = "../../muxers/yamux" } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } [build-dependencies] diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 00f8816e..4808dd65 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -27,9 +27,6 @@ use crate::record::{store::MemoryStore, Key}; use crate::K_VALUE; use futures::{executor::block_on, future::poll_fn, prelude::*}; use futures_timer::Delay; -use libp2p::noise; -use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::yamux; use libp2p_core::{ connection::{ConnectedPoint, ConnectionId}, identity, @@ -38,6 +35,9 @@ use libp2p_core::{ transport::MemoryTransport, upgrade, Endpoint, PeerId, Transport, }; +use libp2p_noise as noise; +use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p_yamux as yamux; use quickcheck::*; use rand::{random, rngs::StdRng, thread_rng, Rng, SeedableRng}; use std::{ diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 2685d9a4..749348e5 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -32,10 +32,12 @@ async-io = ["dep:async-io", "if-watch/smol"] [dev-dependencies] async-std = { version = "1.9.0", features = ["attributes"] } env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } +libp2p-noise = { path = "../../transports/noise" } +libp2p-swarm = { path = "../../swarm", features = ["tokio", "async-std"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["tokio", "async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } tokio = { version = "1.19", default-features = false, features = ["macros", "rt", "rt-multi-thread", "time"] } - [[test]] name = "use-async-std" required-features = ["async-io"] diff --git a/protocols/mdns/src/behaviour/timer.rs b/protocols/mdns/src/behaviour/timer.rs index fbdeb065..b5034592 100644 --- a/protocols/mdns/src/behaviour/timer.rs +++ b/protocols/mdns/src/behaviour/timer.rs @@ -20,18 +20,17 @@ use std::{ marker::Unpin, - pin::Pin, - task::{Context, Poll}, time::{Duration, Instant}, }; -/// Simple wrapper for the differents type of timers +/// Simple wrapper for the different type of timers #[derive(Debug)] +#[cfg(any(feature = "async-io", feature = "tokio"))] pub struct Timer { inner: T, } -/// Builder interface to homogenize the differents implementations +/// Builder interface to homogenize the different implementations pub trait Builder: Send + Unpin + 'static { /// Creates a timer that emits an event once at the given time instant. fn at(instant: Instant) -> Self; @@ -48,6 +47,10 @@ pub mod asio { use super::*; use async_io::Timer as AsioTimer; use futures::Stream; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; /// Async Timer pub type AsyncTimer = Timer; @@ -86,6 +89,10 @@ pub mod tokio { use super::*; use ::tokio::time::{self, Instant as TokioInstant, Interval, MissedTickBehavior}; use futures::Stream; + use std::{ + pin::Pin, + task::{Context, Poll}, + }; /// Tokio wrapper pub type TokioTimer = Timer; diff --git a/protocols/mdns/tests/use-async-std.rs b/protocols/mdns/tests/use-async-std.rs index 67e1d68c..736371a5 100644 --- a/protocols/mdns/tests/use-async-std.rs +++ b/protocols/mdns/tests/use-async-std.rs @@ -19,12 +19,10 @@ // DEALINGS IN THE SOFTWARE.use futures::StreamExt; use futures::StreamExt; -use libp2p::{ - identity, - mdns::{async_io::Behaviour, Config, Event}, - swarm::{Swarm, SwarmEvent}, - PeerId, -}; +use libp2p_core::{identity, upgrade::Version, PeerId, Transport}; +use libp2p_mdns::Event; +use libp2p_mdns::{async_io::Behaviour, Config}; +use libp2p_swarm::{Swarm, SwarmEvent}; use std::error::Error; use std::time::Duration; @@ -60,7 +58,11 @@ async fn test_expired_async_std() -> Result<(), Box> { async fn create_swarm(config: Config) -> Result, Box> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); - let transport = libp2p::development_transport(id_keys).await?; + let transport = libp2p_tcp::async_io::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); let behaviour = Behaviour::new(config)?; let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; diff --git a/protocols/mdns/tests/use-tokio.rs b/protocols/mdns/tests/use-tokio.rs index 3fcf9897..1a48e8c7 100644 --- a/protocols/mdns/tests/use-tokio.rs +++ b/protocols/mdns/tests/use-tokio.rs @@ -18,12 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE.use futures::StreamExt; use futures::StreamExt; -use libp2p::{ - identity, - mdns::{tokio::Behaviour, Config, Event}, - swarm::{Swarm, SwarmEvent}, - PeerId, -}; +use libp2p_core::{identity, upgrade::Version, PeerId, Transport}; +use libp2p_mdns::{tokio::Behaviour, Config, Event}; +use libp2p_swarm::Swarm; +use libp2p_swarm::SwarmEvent; use std::error::Error; use std::time::Duration; @@ -56,7 +54,11 @@ async fn test_expired_tokio() -> Result<(), Box> { async fn create_swarm(config: Config) -> Result, Box> { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); - let transport = libp2p::tokio_development_transport(id_keys)?; + let transport = libp2p_tcp::tokio::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&id_keys).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(); let behaviour = Behaviour::new(config)?; let mut swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id); swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index aab67b8e..04e708d1 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -22,7 +22,11 @@ void = "1.0" [dev-dependencies] async-std = "1.6.2" -libp2p = { path = "../..", features = ["full"] } +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-swarm = { path = "../../swarm", features = ["async-std", "macros"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } # Passing arguments to the docsrs builder in order to properly document cfg's. diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index d33bb962..9939dc49 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -21,19 +21,19 @@ //! Integration tests for the `Ping` network behaviour. use futures::{channel::mpsc, prelude::*}; -use libp2p::core::{ +use libp2p_core::{ identity, muxing::StreamMuxerBox, transport::{self, Transport}, upgrade, Multiaddr, PeerId, }; -use libp2p::mplex; -use libp2p::noise; -use libp2p::ping; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::tcp; -use libp2p::yamux; +use libp2p_mplex as mplex; +use libp2p_noise as noise; +use libp2p_ping as ping; use libp2p_swarm::keep_alive; +use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use quickcheck::*; use std::{num::NonZeroU8, time::Duration}; @@ -271,6 +271,7 @@ impl Arbitrary for MuxerChoice { } #[derive(NetworkBehaviour, Default)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] struct Behaviour { keep_alive: keep_alive::Behaviour, ping: ping::Behaviour, diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 506d1396..69e5d3ba 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -33,10 +33,16 @@ void = "1" prost-build = "0.11" [dev-dependencies] -env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } -quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } clap = { version = "4.0.13", features = ["derive"] } +env_logger = "0.10.0" +libp2p-identify = { path = "../../protocols/identify" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-ping = { path = "../../protocols/ping" } +libp2p-plaintext = { path = "../../transports/plaintext" } +libp2p-swarm = { path = "../../swarm", features = ["macros"] } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } +quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/relay/examples/relay_v2.rs b/protocols/relay/examples/relay_v2.rs index 137132e6..575b8a70 100644 --- a/protocols/relay/examples/relay_v2.rs +++ b/protocols/relay/examples/relay_v2.rs @@ -22,16 +22,15 @@ use clap::Parser; use futures::executor::block_on; use futures::stream::StreamExt; -use libp2p::core::upgrade; -use libp2p::identify; -use libp2p::multiaddr::Protocol; -use libp2p::ping; -use libp2p::relay::v2::relay::{self, Relay}; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::tcp; -use libp2p::Transport; -use libp2p::{identity, PeerId}; -use libp2p::{noise, Multiaddr}; +use libp2p_core::multiaddr::Protocol; +use libp2p_core::upgrade; +use libp2p_core::{identity, Multiaddr, PeerId, Transport}; +use libp2p_identify as identify; +use libp2p_noise as noise; +use libp2p_ping as ping; +use libp2p_relay::v2::relay::{self, Relay}; +use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_tcp as tcp; use std::error::Error; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -54,7 +53,7 @@ fn main() -> Result<(), Box> { noise::NoiseAuthenticated::xx(&local_key) .expect("Signing libp2p-noise static DH keypair failed."), ) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); let behaviour = Behaviour { @@ -93,7 +92,11 @@ fn main() -> Result<(), Box> { } #[derive(NetworkBehaviour)] -#[behaviour(out_event = "Event", event_process = false)] +#[behaviour( + out_event = "Event", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct Behaviour { relay: Relay, ping: ping::Behaviour, diff --git a/protocols/relay/tests/v2.rs b/protocols/relay/tests/v2.rs index b31d6866..07383a3c 100644 --- a/protocols/relay/tests/v2.rs +++ b/protocols/relay/tests/v2.rs @@ -23,17 +23,17 @@ use futures::future::FutureExt; use futures::io::{AsyncRead, AsyncWrite}; use futures::stream::StreamExt; use futures::task::Spawn; -use libp2p::core::multiaddr::{Multiaddr, Protocol}; -use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::transport::choice::OrTransport; -use libp2p::core::transport::{Boxed, MemoryTransport, Transport}; -use libp2p::core::PublicKey; -use libp2p::core::{identity, upgrade, PeerId}; -use libp2p::ping; -use libp2p::plaintext::PlainText2Config; -use libp2p::relay::v2::client; -use libp2p::relay::v2::relay; -use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_core::multiaddr::{Multiaddr, Protocol}; +use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::transport::choice::OrTransport; +use libp2p_core::transport::{Boxed, MemoryTransport, Transport}; +use libp2p_core::PublicKey; +use libp2p_core::{identity, upgrade, PeerId}; +use libp2p_ping as ping; +use libp2p_plaintext::PlainText2Config; +use libp2p_relay::v2::client; +use libp2p_relay::v2::relay; +use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; use std::time::Duration; #[test] @@ -338,12 +338,16 @@ where transport .upgrade(upgrade::Version::V1) .authenticate(PlainText2Config { local_public_key }) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed() } #[derive(NetworkBehaviour)] -#[behaviour(out_event = "RelayEvent", event_process = false)] +#[behaviour( + out_event = "RelayEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct Relay { relay: relay::Relay, ping: ping::Behaviour, @@ -368,7 +372,11 @@ impl From for RelayEvent { } #[derive(NetworkBehaviour)] -#[behaviour(out_event = "ClientEvent", event_process = false)] +#[behaviour( + out_event = "ClientEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct Client { relay: client::Client, ping: ping::Behaviour, diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 7ddfc7ad..9a3522ce 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -30,7 +30,13 @@ void = "1" [dev-dependencies] async-trait = "0.1" env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } +libp2p-swarm = { path = "../../swarm", features = ["macros", "tokio"] } +libp2p-mplex = { path = "../../muxers/mplex" } +libp2p-noise = { path = "../../transports/noise" } +libp2p-ping = { path = "../ping" } +libp2p-identify = { path = "../identify" } +libp2p-yamux = { path = "../../muxers/yamux" } +libp2p-tcp = { path = "../../transports/tcp", features = ["tokio"] } rand = "0.8" tokio = { version = "1.15", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] } diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index d400301b..274432bc 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -19,13 +19,10 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p::core::identity; -use libp2p::core::PeerId; -use libp2p::multiaddr::Protocol; -use libp2p::ping; -use libp2p::swarm::{keep_alive, SwarmEvent}; -use libp2p::Swarm; -use libp2p::{rendezvous, tokio_development_transport, Multiaddr}; +use libp2p_core::{identity, multiaddr::Protocol, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_ping as ping; +use libp2p_rendezvous as rendezvous; +use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; use std::time::Duration; use void::Void; @@ -42,7 +39,11 @@ async fn main() { .unwrap(); let mut swarm = Swarm::with_tokio_executor( - tokio_development_transport(identity.clone()).unwrap(), + libp2p_tcp::tokio::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), @@ -143,9 +144,12 @@ impl From for MyEvent { } } -#[derive(libp2p::swarm::NetworkBehaviour)] -#[behaviour(event_process = false)] -#[behaviour(out_event = "MyEvent")] +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour( + out_event = "MyEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs index 903a7961..98060c5c 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/protocols/rendezvous/examples/register.rs @@ -19,13 +19,11 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p::core::identity; -use libp2p::core::PeerId; -use libp2p::ping; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::Multiaddr; -use libp2p::{rendezvous, tokio_development_transport}; +use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_ping as ping; +use libp2p_rendezvous as rendezvous; use libp2p_swarm::AddressScore; +use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use std::time::Duration; #[tokio::main] @@ -40,7 +38,11 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); let mut swarm = Swarm::with_tokio_executor( - tokio_development_transport(identity.clone()).unwrap(), + libp2p_tcp::tokio::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(), MyBehaviour { rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), @@ -130,8 +132,11 @@ impl From for MyEvent { } #[derive(NetworkBehaviour)] -#[behaviour(event_process = false)] -#[behaviour(out_event = "MyEvent")] +#[behaviour( + out_event = "MyEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index 06844be1..8e0fac23 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -19,13 +19,11 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p::core::identity; -use libp2p::core::PeerId; -use libp2p::identify; -use libp2p::ping; -use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::Multiaddr; -use libp2p::{rendezvous, tokio_development_transport}; +use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; +use libp2p_identify as identify; +use libp2p_ping as ping; +use libp2p_rendezvous as rendezvous; +use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; use std::time::Duration; use void::Void; @@ -41,7 +39,11 @@ async fn main() { let identity = identity::Keypair::generate_ed25519(); let mut swarm = Swarm::with_tokio_executor( - tokio_development_transport(identity.clone()).unwrap(), + libp2p_tcp::tokio::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), @@ -144,8 +146,11 @@ impl From for MyEvent { } #[derive(NetworkBehaviour)] -#[behaviour(event_process = false)] -#[behaviour(out_event = "MyEvent")] +#[behaviour( + out_event = "MyEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::client::Behaviour, diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index 1e98f73d..49680bfd 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -19,12 +19,11 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p::core::identity; -use libp2p::core::PeerId; -use libp2p::identify; -use libp2p::ping; -use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{rendezvous, tokio_development_transport}; +use libp2p_core::{identity, upgrade::Version, PeerId, Transport}; +use libp2p_identify as identify; +use libp2p_ping as ping; +use libp2p_rendezvous as rendezvous; +use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; use void::Void; /// Examples for the rendezvous protocol: @@ -44,7 +43,11 @@ async fn main() { let identity = identity::Keypair::Ed25519(key.into()); let mut swarm = Swarm::with_tokio_executor( - tokio_development_transport(identity.clone()).unwrap(), + libp2p_tcp::tokio::Transport::default() + .upgrade(Version::V1) + .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) + .multiplex(libp2p_yamux::YamuxConfig::default()) + .boxed(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), @@ -131,8 +134,11 @@ impl From for MyEvent { } #[derive(NetworkBehaviour)] -#[behaviour(event_process = false)] -#[behaviour(out_event = "MyEvent")] +#[behaviour( + out_event = "MyEvent", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::server::Behaviour, diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 523f34c7..d22f0eb2 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -22,14 +22,14 @@ use async_trait::async_trait; use futures::stream::FusedStream; use futures::StreamExt; use futures::{future, Stream}; -use libp2p::core::transport::upgrade::Version; -use libp2p::core::transport::MemoryTransport; -use libp2p::core::upgrade::SelectUpgrade; -use libp2p::core::{identity, Multiaddr, PeerId, Transport}; -use libp2p::mplex::MplexConfig; -use libp2p::noise::NoiseAuthenticated; -use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::yamux::YamuxConfig; +use libp2p_core::transport::upgrade::Version; +use libp2p_core::transport::MemoryTransport; +use libp2p_core::upgrade::SelectUpgrade; +use libp2p_core::{identity, Multiaddr, PeerId, Transport}; +use libp2p_mplex::MplexConfig; +use libp2p_noise::NoiseAuthenticated; +use libp2p_swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p_yamux::YamuxConfig; use std::fmt::Debug; use std::time::Duration; @@ -107,15 +107,15 @@ where macro_rules! assert_behaviour_events { ($swarm: ident: $pat: pat, || $body: block) => { match await_event_or_timeout(&mut $swarm).await { - libp2p::swarm::SwarmEvent::Behaviour($pat) => $body, + libp2p_swarm::SwarmEvent::Behaviour($pat) => $body, _ => panic!("Unexpected combination of events emitted, check logs for details"), } }; ($swarm1: ident: $pat1: pat, $swarm2: ident: $pat2: pat, || $body: block) => { match await_events_or_timeout(&mut $swarm1, &mut $swarm2).await { ( - libp2p::swarm::SwarmEvent::Behaviour($pat1), - libp2p::swarm::SwarmEvent::Behaviour($pat2), + libp2p_swarm::SwarmEvent::Behaviour($pat1), + libp2p_swarm::SwarmEvent::Behaviour($pat2), ) => $body, _ => panic!("Unexpected combination of events emitted, check logs for details"), } diff --git a/protocols/rendezvous/tests/rendezvous.rs b/protocols/rendezvous/tests/rendezvous.rs index 85fdacd8..790e74c7 100644 --- a/protocols/rendezvous/tests/rendezvous.rs +++ b/protocols/rendezvous/tests/rendezvous.rs @@ -24,9 +24,9 @@ pub mod harness; use crate::harness::{await_event_or_timeout, await_events_or_timeout, new_swarm, SwarmExt}; use futures::stream::FuturesUnordered; use futures::StreamExt; -use libp2p::core::identity; -use libp2p::rendezvous; -use libp2p::swarm::{DialError, Swarm, SwarmEvent}; +use libp2p_core::identity; +use libp2p_rendezvous as rendezvous; +use libp2p_swarm::{DialError, Swarm, SwarmEvent}; use std::convert::TryInto; use std::time::Duration; @@ -366,8 +366,12 @@ async fn new_impersonating_client() -> Swarm { eve } -#[derive(libp2p::swarm::NetworkBehaviour)] -#[behaviour(event_process = false, out_event = "CombinedEvent")] +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour( + event_process = false, + out_event = "CombinedEvent", + prelude = "libp2p_swarm::derive_prelude" +)] struct CombinedBehaviour { client: rendezvous::client::Behaviour, server: rendezvous::server::Behaviour, diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 9f587980..76c15974 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -25,7 +25,9 @@ unsigned-varint = { version = "0.7", features = ["std", "futures"] } [dev-dependencies] async-std = "1.6.2" env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } +libp2p-noise = { path = "../../transports/noise" } +libp2p-tcp = { path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } rand = "0.8" # Passing arguments to the docsrs builder in order to properly document cfg's. diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index e97b725d..1e0af206 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -22,18 +22,17 @@ use async_trait::async_trait; use futures::{channel::mpsc, prelude::*, AsyncWriteExt}; -use libp2p::core::{ +use libp2p_core::{ identity, muxing::StreamMuxerBox, transport, upgrade::{self, read_length_prefixed, write_length_prefixed}, - Multiaddr, PeerId, + Multiaddr, PeerId, Transport, }; -use libp2p::noise::NoiseAuthenticated; -use libp2p::request_response::*; -use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::tcp; -use libp2p_core::Transport; +use libp2p_noise::NoiseAuthenticated; +use libp2p_request_response::*; +use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p_tcp as tcp; use rand::{self, Rng}; use std::{io, iter}; @@ -302,7 +301,7 @@ fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) .upgrade(upgrade::Version::V1) .authenticate(NoiseAuthenticated::xx(&id_keys).unwrap()) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(), ) } diff --git a/src/tutorials/ping.rs b/src/tutorials/ping.rs index 167c7d79..be473d54 100644 --- a/src/tutorials/ping.rs +++ b/src/tutorials/ping.rs @@ -174,8 +174,7 @@ //! [`Transport`] to the [`NetworkBehaviour`]. //! //! ```rust -//! use libp2p::{identity, PeerId, ping}; -//! use libp2p::swarm::Swarm; +//! use libp2p::{identity, PeerId, ping, Swarm}; //! use std::error::Error; //! //! #[async_std::main] @@ -226,8 +225,7 @@ //! remote peer. //! //! ```rust -//! use libp2p::{identity, Multiaddr, PeerId, ping}; -//! use libp2p::swarm::{Swarm, dial_opts::DialOpts}; +//! use libp2p::{identity, Multiaddr, PeerId, ping, Swarm, swarm::dial_opts::DialOpts}; //! use std::error::Error; //! //! #[async_std::main] @@ -271,8 +269,7 @@ //! //! ```no_run //! use futures::prelude::*; -//! use libp2p::swarm::{Swarm, SwarmEvent, dial_opts::DialOpts}; -//! use libp2p::{identity, Multiaddr, PeerId, ping}; +//! use libp2p::{identity, Multiaddr, PeerId, ping, swarm::{Swarm, SwarmEvent, dial_opts::DialOpts}}; //! use std::error::Error; //! //! #[async_std::main] diff --git a/transports/deflate/Cargo.toml b/transports/deflate/Cargo.toml index 83729d8e..dcd2bfcb 100644 --- a/transports/deflate/Cargo.toml +++ b/transports/deflate/Cargo.toml @@ -17,7 +17,7 @@ flate2 = "1.0" [dev-dependencies] async-std = "1.6.2" -libp2p = { path = "../..", features = ["full"] } +libp2p-tcp = { path = "../tcp", features = ["async-io"] } quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } rand = "0.8" diff --git a/transports/deflate/tests/test.rs b/transports/deflate/tests/test.rs index 2fa97853..c0fdd9dc 100644 --- a/transports/deflate/tests/test.rs +++ b/transports/deflate/tests/test.rs @@ -19,9 +19,9 @@ // DEALINGS IN THE SOFTWARE. use futures::{future, prelude::*}; -use libp2p::core::{transport::Transport, upgrade}; -use libp2p::deflate::DeflateConfig; -use libp2p::tcp; +use libp2p_core::{transport::Transport, upgrade}; +use libp2p_deflate::DeflateConfig; +use libp2p_tcp as tcp; use quickcheck::{QuickCheck, TestResult}; use rand::RngCore; diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 61c80e27..3d67b628 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -573,7 +573,7 @@ fn invalid_data(e: impl Into>) -> io::E io::Error::new(io::ErrorKind::InvalidData, e) } -#[cfg(test)] +#[cfg(all(test, any(feature = "tokio", feature = "async-std")))] mod tests { use super::*; use futures::future::BoxFuture; diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 19b77d9c..f0bf2864 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -31,11 +31,11 @@ snow = { version = "0.9.0", features = ["default-resolver"], default-features = [dev-dependencies] async-io = "1.2.0" +ed25519-compact = "1.0.11" env_logger = "0.10.0" -libp2p = { path = "../..", features = ["full"] } -quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } +libp2p-tcp = { path = "../tcp", features = ["async-io"] } libsodium-sys-stable = { version = "1.19.22", features = ["fetch-latest"] } -ed25519-compact = "2.0.2" +quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" } [build-dependencies] prost-build = "0.11" diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index 1837bdbd..45e5540d 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -40,8 +40,8 @@ //! //! ``` //! use libp2p_core::{identity, Transport, upgrade}; -//! use libp2p::tcp::TcpTransport; -//! use libp2p::noise::{Keypair, X25519Spec, NoiseAuthenticated}; +//! use libp2p_tcp::TcpTransport; +//! use libp2p_noise::{Keypair, X25519Spec, NoiseAuthenticated}; //! //! # fn main() { //! let id_keys = identity::Keypair::generate_ed25519(); diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index b9fe0027..7abfa205 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -23,14 +23,14 @@ use futures::{ future::{self, Either}, prelude::*, }; -use libp2p::core::identity; -use libp2p::core::transport::{self, Transport}; -use libp2p::core::upgrade::{self, apply_inbound, apply_outbound, Negotiated}; -use libp2p::noise::{ - Keypair, NoiseAuthenticated, NoiseConfig, NoiseOutput, RemoteIdentity, X25519Spec, X25519, +use libp2p_core::transport::Transport; +use libp2p_core::upgrade::{apply_inbound, apply_outbound, Negotiated}; +use libp2p_core::{identity, transport, upgrade}; +use libp2p_noise::{ + Keypair, NoiseAuthenticated, NoiseConfig, NoiseError, NoiseOutput, RemoteIdentity, X25519Spec, + X25519, }; -use libp2p::tcp; -use libp2p_noise::NoiseError; +use libp2p_tcp as tcp; use log::info; use quickcheck::*; use std::{convert::TryInto, io, net::TcpStream}; diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index b759972b..6700fd11 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -38,8 +38,10 @@ rustc-args = ["--cfg", "docsrs"] [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes"] } env_logger = "0.10.0" -libp2p = { path = "../..", features = ["tcp", "yamux", "noise", "async-std"] } libp2p-muxer-test-harness = { path = "../../muxers/test-harness" } +libp2p-noise = { path = "../noise" } +libp2p-tcp = { path = "../tcp", features = ["async-io"] } +libp2p-yamux = { path = "../../muxers/yamux" } quickcheck = "1" tokio = { version = "1.21.1", features = ["macros", "rt-multi-thread", "time"] } diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index 3a58b2a5..bd6dd3f1 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -4,14 +4,14 @@ use futures::channel::{mpsc, oneshot}; use futures::future::{poll_fn, Either}; use futures::stream::StreamExt; use futures::{future, AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt}; -use libp2p::core::multiaddr::Protocol; -use libp2p::core::Transport; -use libp2p::{noise, tcp, yamux, Multiaddr}; use libp2p_core::either::EitherOutput; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt, SubstreamBox}; use libp2p_core::transport::{Boxed, OrTransport, TransportEvent}; -use libp2p_core::{upgrade, PeerId}; +use libp2p_core::{multiaddr::Protocol, upgrade, Multiaddr, PeerId, Transport}; +use libp2p_noise as noise; use libp2p_quic as quic; +use libp2p_tcp as tcp; +use libp2p_yamux as yamux; use quic::Provider; use rand::RngCore; use std::future::Future; @@ -94,7 +94,7 @@ async fn ipv4_dial_ipv6() { #[async_std::test] #[ignore] // Transport currently does not validate PeerId. Enable once we make use of PeerId validation in rustls. async fn wrong_peerid() { - use libp2p::PeerId; + use libp2p_core::PeerId; let (a_peer_id, mut a_transport) = create_default_transport::(); let (b_peer_id, mut b_transport) = create_default_transport::(); @@ -189,7 +189,7 @@ async fn draft_29_support() { use std::task::Poll; use futures::{future::poll_fn, select}; - use libp2p::TransportError; + use libp2p_core::transport::TransportError; let _ = env_logger::try_init(); @@ -379,8 +379,8 @@ macro_rules! swap_protocol { }; } -fn generate_tls_keypair() -> libp2p::identity::Keypair { - libp2p::identity::Keypair::generate_ed25519() +fn generate_tls_keypair() -> libp2p_core::identity::Keypair { + libp2p_core::identity::Keypair::generate_ed25519() } fn create_default_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { diff --git a/transports/tls/Cargo.toml b/transports/tls/Cargo.toml index 7755ce60..f0b9e637 100644 --- a/transports/tls/Cargo.toml +++ b/transports/tls/Cargo.toml @@ -28,7 +28,9 @@ features = ["dangerous_configuration"] # Must enable this to allow for custom ve [dev-dependencies] hex = "0.4.3" hex-literal = "0.3.4" -libp2p = { path = "../..", features = ["yamux", "rsa", "ecdsa", "secp256k1"], default-features = false } +libp2p-core = { path = "../../core", features = ["rsa", "ecdsa", "secp256k1"] } +libp2p-swarm = { path = "../../swarm" } +libp2p-yamux = { path = "../../muxers/yamux" } tokio = { version = "1.21.1", features = ["full"] } # Passing arguments to the docsrs builder in order to properly document cfg's. diff --git a/transports/tls/tests/smoke.rs b/transports/tls/tests/smoke.rs index d30753b8..1d49fd5c 100644 --- a/transports/tls/tests/smoke.rs +++ b/transports/tls/tests/smoke.rs @@ -1,10 +1,9 @@ use futures::{future, StreamExt}; -use libp2p::multiaddr::Protocol; -use libp2p::swarm::{keep_alive, SwarmEvent}; -use libp2p::Swarm; +use libp2p_core::multiaddr::Protocol; use libp2p_core::transport::MemoryTransport; use libp2p_core::upgrade::Version; use libp2p_core::Transport; +use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; #[tokio::test] async fn can_establish_connection() { @@ -57,12 +56,12 @@ async fn can_establish_connection() { } fn make_swarm() -> Swarm { - let identity = libp2p::identity::Keypair::generate_ed25519(); + let identity = libp2p_core::identity::Keypair::generate_ed25519(); let transport = MemoryTransport::default() .upgrade(Version::V1) .authenticate(libp2p_tls::Config::new(&identity).unwrap()) - .multiplex(libp2p::yamux::YamuxConfig::default()) + .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); Swarm::without_executor( diff --git a/transports/webrtc/Cargo.toml b/transports/webrtc/Cargo.toml index e76e26cd..04353757 100644 --- a/transports/webrtc/Cargo.toml +++ b/transports/webrtc/Cargo.toml @@ -45,10 +45,12 @@ prost-build = "0.11" anyhow = "1.0" env_logger = "0.10" hex-literal = "0.3" -libp2p = { path = "../..", features = ["full"] } +libp2p-swarm = { path = "../../swarm", features = ["macros", "tokio"] } +libp2p-ping = { path = "../../protocols/ping" } tokio = { version = "1.19", features = ["full"] } unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1" +quickcheck = "1.0.3" [[test]] name = "smoke" diff --git a/transports/webrtc/examples/listen_ping.rs b/transports/webrtc/examples/listen_ping.rs index 55e19044..b6cf16e6 100644 --- a/transports/webrtc/examples/listen_ping.rs +++ b/transports/webrtc/examples/listen_ping.rs @@ -1,10 +1,10 @@ use anyhow::Result; use futures::StreamExt; -use libp2p::swarm::{keep_alive, NetworkBehaviour}; -use libp2p::Transport; -use libp2p::{ping, Swarm}; use libp2p_core::identity; use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::Transport; +use libp2p_ping as ping; +use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm}; use rand::thread_rng; use void::Void; @@ -41,7 +41,11 @@ fn create_swarm() -> Result> { } #[derive(NetworkBehaviour, Default)] -#[behaviour(out_event = "Event", event_process = false)] +#[behaviour( + out_event = "Event", + event_process = false, + prelude = "libp2p_swarm::derive_prelude" +)] struct Behaviour { ping: ping::Behaviour, keep_alive: keep_alive::Behaviour, diff --git a/transports/webrtc/tests/smoke.rs b/transports/webrtc/tests/smoke.rs index c15d9f9a..68821dcd 100644 --- a/transports/webrtc/tests/smoke.rs +++ b/transports/webrtc/tests/smoke.rs @@ -18,469 +18,355 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use anyhow::Result; -use async_trait::async_trait; -use futures::{ - future::{select, Either, FutureExt}, - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, - stream::StreamExt, -}; -use libp2p::core::{identity, muxing::StreamMuxerBox, upgrade, Transport as _}; -use libp2p::request_response::{ - ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, -}; -use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::webrtc::tokio as webrtc; +use futures::channel::mpsc; +use futures::future::{BoxFuture, Either}; +use futures::stream::StreamExt; +use futures::{future, ready, AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt}; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use libp2p_core::transport::{Boxed, TransportEvent}; +use libp2p_core::{Multiaddr, PeerId, Transport}; +use libp2p_webrtc as webrtc; use rand::{thread_rng, RngCore}; - -use std::{io, iter}; +use std::future::Future; +use std::num::NonZeroU8; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; #[tokio::test] -async fn smoke() -> Result<()> { - let _ = env_logger::builder().is_test(true).try_init(); +async fn smoke() { + let _ = env_logger::try_init(); - let mut rng = rand::thread_rng(); + let (a_peer_id, mut a_transport) = create_transport(); + let (b_peer_id, mut b_transport) = create_transport(); - let mut a = create_swarm()?; - let mut b = create_swarm()?; + let addr = start_listening(&mut a_transport, "/ip4/127.0.0.1/udp/0/webrtc").await; + start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/webrtc").await; + let ((a_connected, _, _), (b_connected, _)) = + connect(&mut a_transport, &mut b_transport, addr).await; - Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; - Swarm::listen_on(&mut b, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; - - let addr = match a.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - }; - - // skip other interface addresses - while a.next().now_or_never().is_some() {} - - let _ = match b.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - }; - - // skip other interface addresses - while b.next().now_or_never().is_some() {} - - let mut data = vec![0; 4096]; - rng.fill_bytes(&mut data); - - b.behaviour_mut() - .add_address(Swarm::local_peer_id(&a), addr); - b.behaviour_mut() - .send_request(Swarm::local_peer_id(&a), Ping(data.clone())); - - match b.next().await { - Some(SwarmEvent::Dialing(_)) => {} - e => panic!("{:?}", e), - } - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Left((Some(SwarmEvent::IncomingConnection { .. }), _)) => {} - Either::Left((e, _)) => panic!("{:?}", e), - Either::Right(_) => panic!("b completed first"), - } - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Left((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} - Either::Left((e, _)) => panic!("{:?}", e), - Either::Right((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} - Either::Right((e, _)) => panic!("{:?}", e), - } - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Left((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} - Either::Left((e, _)) => panic!("{:?}", e), - Either::Right((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} - Either::Right((e, _)) => panic!("{:?}", e), - } - - assert!(b.next().now_or_never().is_none()); - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Left(( - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request: Ping(ping), - channel, - .. - }, - .. - })), - _, - )) => { - a.behaviour_mut() - .send_response(channel, Pong(ping)) - .unwrap(); - } - Either::Left((e, _)) => panic!("{:?}", e), - Either::Right(_) => panic!("b completed first"), - } - - match a.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} - e => panic!("{:?}", e), - } - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Right(( - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - response: Pong(pong), - .. - }, - .. - })), - _, - )) => assert_eq!(data, pong), - Either::Right((e, _)) => panic!("{:?}", e), - Either::Left(_) => panic!("a completed first"), - } - - a.behaviour_mut().send_request( - Swarm::local_peer_id(&b), - Ping(b"another substream".to_vec()), - ); - - assert!(a.next().now_or_never().is_none()); - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Right(( - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request: Ping(data), - channel, - .. - }, - .. - })), - _, - )) => { - b.behaviour_mut() - .send_response(channel, Pong(data)) - .unwrap(); - } - Either::Right((e, _)) => panic!("{:?}", e), - Either::Left(_) => panic!("a completed first"), - } - - match b.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} - e => panic!("{:?}", e), - } - - let pair = select(a.next(), b.next()); - match pair.await { - Either::Left(( - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - response: Pong(data), - .. - }, - .. - })), - _, - )) => assert_eq!(data, b"another substream".to_vec()), - Either::Left((e, _)) => panic!("{:?}", e), - Either::Right(_) => panic!("b completed first"), - } - - Ok(()) + assert_eq!(a_connected, b_peer_id); + assert_eq!(b_connected, a_peer_id); } -#[tokio::test] -async fn dial_failure() -> Result<()> { - let _ = env_logger::builder().is_test(true).try_init(); +// Note: This test should likely be ported to the muxer compliance test suite. +#[test] +fn concurrent_connections_and_streams_tokio() { + let _ = env_logger::try_init(); - let mut a = create_swarm()?; - let mut b = create_swarm()?; - - Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; - Swarm::listen_on(&mut b, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; - - let addr = match a.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - }; - - // skip other interface addresses - while a.next().now_or_never().is_some() {} - - let _ = match b.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - }; - - // skip other interface addresses - while b.next().now_or_never().is_some() {} - - let a_peer_id = &Swarm::local_peer_id(&a).clone(); - drop(a); // stop a swarm so b can never reach it - - b.behaviour_mut().add_address(a_peer_id, addr); - b.behaviour_mut() - .send_request(a_peer_id, Ping(b"hello world".to_vec())); - - match b.next().await { - Some(SwarmEvent::Dialing(_)) => {} - e => panic!("{:?}", e), - } - - match b.next().await { - Some(SwarmEvent::OutgoingConnectionError { .. }) => {} - e => panic!("{:?}", e), - }; - - match b.next().await { - Some(SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { .. })) => {} - e => panic!("{:?}", e), - }; - - Ok(()) + let rt = tokio::runtime::Runtime::new().unwrap(); + let _guard = rt.enter(); + quickcheck::QuickCheck::new() + .min_tests_passed(1) + .quickcheck(prop as fn(_, _) -> _); } -#[tokio::test] -async fn concurrent_connections_and_streams() { - let _ = env_logger::builder().is_test(true).try_init(); +fn generate_tls_keypair() -> libp2p_core::identity::Keypair { + libp2p_core::identity::Keypair::generate_ed25519() +} - let num_listeners = 3usize; - let num_streams = 8usize; +fn create_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { + let keypair = generate_tls_keypair(); + let peer_id = keypair.public().to_peer_id(); - let mut data = vec![0; 4096]; - rand::thread_rng().fill_bytes(&mut data); - let mut listeners = vec![]; + let transport = webrtc::tokio::Transport::new( + keypair, + webrtc::tokio::Certificate::generate(&mut thread_rng()).unwrap(), + ) + .map(|(p, c), _| (p, StreamMuxerBox::new(c))) + .boxed(); + + (peer_id, transport) +} + +async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr { + transport.listen_on(addr.parse().unwrap()).unwrap(); + match transport.next().await { + Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr, + e => panic!("{:?}", e), + } +} + +fn prop(number_listeners: NonZeroU8, number_streams: NonZeroU8) -> quickcheck::TestResult { + const BUFFER_SIZE: usize = 4096 * 10; + + let number_listeners = u8::from(number_listeners) as usize; + let number_streams = u8::from(number_streams) as usize; + + if number_listeners > 10 || number_streams > 10 { + return quickcheck::TestResult::discard(); + } + + let (listeners_tx, mut listeners_rx) = mpsc::channel(number_listeners); + + log::info!("Creating {number_streams} streams on {number_listeners} connections"); // Spawn the listener nodes. - for _ in 0..num_listeners { - let mut listener = create_swarm().unwrap(); - Swarm::listen_on( - &mut listener, - "/ip4/127.0.0.1/udp/0/webrtc".parse().unwrap(), - ) - .unwrap(); + for _ in 0..number_listeners { + tokio::spawn({ + let mut listeners_tx = listeners_tx.clone(); - // Wait to listen on address. - let addr = match listener.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - }; + async move { + let (peer_id, mut listener) = create_transport(); + let addr = start_listening(&mut listener, "/ip4/127.0.0.1/udp/0/webrtc").await; - listeners.push((*listener.local_peer_id(), addr)); + listeners_tx.send((peer_id, addr)).await.unwrap(); - tokio::spawn(async move { - loop { - match listener.next().await { - Some(SwarmEvent::IncomingConnection { .. }) => { - log::debug!("listener IncomingConnection"); - } - Some(SwarmEvent::ConnectionEstablished { .. }) => { - log::debug!("listener ConnectionEstablished"); - } - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request: Ping(ping), - channel, - .. - }, - .. - })) => { - log::debug!("listener got Message"); - listener - .behaviour_mut() - .send_response(channel, Pong(ping)) - .unwrap(); - } - Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => { - log::debug!("listener ResponseSent"); - } - Some(SwarmEvent::ConnectionClosed { .. }) => {} - Some(SwarmEvent::NewListenAddr { .. }) => { - log::debug!("listener NewListenAddr"); - } - Some(e) => { - panic!("unexpected event {:?}", e); - } - None => { - panic!("listener stopped"); + loop { + if let TransportEvent::Incoming { upgrade, .. } = + listener.select_next_some().await + { + let (_, connection) = upgrade.await.unwrap(); + + tokio::spawn(answer_inbound_streams::(connection)); } } } }); } - let mut dialer = create_swarm().unwrap(); - Swarm::listen_on(&mut dialer, "/ip4/127.0.0.1/udp/0/webrtc".parse().unwrap()).unwrap(); - - // Wait to listen on address. - match dialer.next().await { - Some(SwarmEvent::NewListenAddr { address, .. }) => address, - e => panic!("{:?}", e), - }; + let (completed_streams_tx, completed_streams_rx) = + mpsc::channel(number_streams * number_listeners); // For each listener node start `number_streams` requests. - for (listener_peer_id, listener_addr) in &listeners { - dialer - .behaviour_mut() - .add_address(listener_peer_id, listener_addr.clone()); + tokio::spawn(async move { + let (_, mut dialer) = create_transport(); - dialer.dial(*listener_peer_id).unwrap(); + while let Some((_, listener_addr)) = listeners_rx.next().await { + let (_, connection) = Dial::new(&mut dialer, listener_addr.clone()).await; + + tokio::spawn(open_outbound_streams::( + connection, + number_streams, + completed_streams_tx.clone(), + )); + } + + // Drive the dialer. + loop { + dialer.next().await; + } + }); + + let completed_streams = number_streams * number_listeners; + + // Wait for all streams to complete. + tokio::runtime::Handle::current() + .block_on(tokio::time::timeout( + Duration::from_secs(30), + completed_streams_rx + .take(completed_streams) + .collect::>(), + )) + .unwrap(); + + quickcheck::TestResult::passed() +} + +async fn answer_inbound_streams(mut connection: StreamMuxerBox) { + loop { + let mut inbound_stream = match future::poll_fn(|cx| { + let _ = connection.poll_unpin(cx)?; + + connection.poll_inbound_unpin(cx) + }) + .await + { + Ok(s) => s, + Err(_) => return, + }; + + tokio::spawn(async move { + // FIXME: Need to write _some_ data before we can read on both sides. + // Do a ping-pong exchange. + { + let mut pong = [0u8; 4]; + inbound_stream.write_all(b"PING").await.unwrap(); + inbound_stream.flush().await.unwrap(); + inbound_stream.read_exact(&mut pong).await.unwrap(); + assert_eq!(&pong, b"PONG"); + } + + let mut data = vec![0; BUFFER_SIZE]; + + inbound_stream.read_exact(&mut data).await.unwrap(); + inbound_stream.write_all(&data).await.unwrap(); + inbound_stream.close().await.unwrap(); + }); + } +} + +async fn open_outbound_streams( + mut connection: StreamMuxerBox, + number_streams: usize, + completed_streams_tx: mpsc::Sender<()>, +) { + for _ in 0..number_streams { + let mut outbound_stream = future::poll_fn(|cx| { + let _ = connection.poll_unpin(cx)?; + + connection.poll_outbound_unpin(cx) + }) + .await + .unwrap(); + + tokio::spawn({ + let mut completed_streams_tx = completed_streams_tx.clone(); + + async move { + // FIXME: Need to write _some_ data before we can read on both sides. + // Do a ping-pong exchange. + { + let mut ping = [0u8; 4]; + outbound_stream.write_all(b"PONG").await.unwrap(); + outbound_stream.flush().await.unwrap(); + outbound_stream.read_exact(&mut ping).await.unwrap(); + assert_eq!(&ping, b"PING"); + } + + let mut data = vec![0; BUFFER_SIZE]; + rand::thread_rng().fill_bytes(&mut data); + + let mut received = Vec::new(); + + outbound_stream.write_all(&data).await.unwrap(); + outbound_stream.flush().await.unwrap(); + outbound_stream.read_to_end(&mut received).await.unwrap(); + + assert_eq!(received, data); + + completed_streams_tx.send(()).await.unwrap(); + } + }); } - // Wait for responses to each request. - let mut num_responses = 0; - loop { - match dialer.next().await { - Some(SwarmEvent::Dialing(_)) => { - log::debug!("dialer Dialing"); - } - Some(SwarmEvent::ConnectionEstablished { peer_id, .. }) => { - log::debug!("dialer Connection established"); - for _ in 0..num_streams { - dialer - .behaviour_mut() - .send_request(&peer_id, Ping(data.clone())); + log::info!("Created {number_streams} streams"); + + while future::poll_fn(|cx| connection.poll_unpin(cx)) + .await + .is_ok() + {} +} + +async fn connect( + a_transport: &mut Boxed<(PeerId, StreamMuxerBox)>, + b_transport: &mut Boxed<(PeerId, StreamMuxerBox)>, + addr: Multiaddr, +) -> ( + (PeerId, Multiaddr, StreamMuxerBox), + (PeerId, StreamMuxerBox), +) { + match futures::future::select( + ListenUpgrade::new(a_transport), + Dial::new(b_transport, addr), + ) + .await + { + Either::Left((listen_done, dial)) => { + let mut pending_dial = dial; + + loop { + match future::select(pending_dial, a_transport.next()).await { + Either::Left((dial_done, _)) => return (listen_done, dial_done), + Either::Right((_, dial)) => { + pending_dial = dial; + } } } - Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { - message: - RequestResponseMessage::Response { - response: Pong(pong), - .. - }, - .. - })) => { - log::debug!("dialer got Message"); - num_responses += 1; - assert_eq!(data, pong); - let should_be = num_listeners * num_streams; - log::debug!( - "num of responses: {}, num of listeners * num of streams: {}", - num_responses, - should_be - ); - if num_responses == should_be { - break; + } + Either::Right((dial_done, listen)) => { + let mut pending_listen = listen; + + loop { + match future::select(pending_listen, b_transport.next()).await { + Either::Left((listen_done, _)) => return (listen_done, dial_done), + Either::Right((_, listen)) => { + pending_listen = listen; + } } } - Some(SwarmEvent::ConnectionClosed { .. }) => { - log::debug!("dialer ConnectionClosed"); - } - Some(SwarmEvent::NewListenAddr { .. }) => { - log::debug!("dialer NewListenAddr"); - } - e => { - panic!("unexpected event {:?}", e); - } } } } -#[derive(Debug, Clone)] -struct PingProtocol(); +struct ListenUpgrade<'a> { + listener: &'a mut Boxed<(PeerId, StreamMuxerBox)>, + listener_upgrade_task: Option>, +} -#[derive(Clone)] -struct PingCodec(); - -#[derive(Debug, Clone, PartialEq, Eq)] -struct Ping(Vec); - -#[derive(Debug, Clone, PartialEq, Eq)] -struct Pong(Vec); - -impl ProtocolName for PingProtocol { - fn protocol_name(&self) -> &[u8] { - "/ping/1".as_bytes() +impl<'a> ListenUpgrade<'a> { + pub fn new(listener: &'a mut Boxed<(PeerId, StreamMuxerBox)>) -> Self { + Self { + listener, + listener_upgrade_task: None, + } } } -#[async_trait] -impl RequestResponseCodec for PingCodec { - type Protocol = PingProtocol; - type Request = Ping; - type Response = Pong; +struct Dial<'a> { + dialer: &'a mut Boxed<(PeerId, StreamMuxerBox)>, + dial_task: BoxFuture<'static, (PeerId, StreamMuxerBox)>, +} - async fn read_request(&mut self, _: &PingProtocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - upgrade::read_length_prefixed(io, 4096) - .map(|res| match res { - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), - Ok(vec) => Ok(Ping(vec)), - }) - .await - } - - async fn read_response(&mut self, _: &PingProtocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - upgrade::read_length_prefixed(io, 4096) - .map(|res| match res { - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), - Ok(vec) => Ok(Pong(vec)), - }) - .await - } - - async fn write_request( - &mut self, - _: &PingProtocol, - io: &mut T, - Ping(data): Ping, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - upgrade::write_length_prefixed(io, data).await?; - io.close().await?; - Ok(()) - } - - async fn write_response( - &mut self, - _: &PingProtocol, - io: &mut T, - Pong(data): Pong, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - upgrade::write_length_prefixed(io, data).await?; - io.close().await?; - Ok(()) +impl<'a> Dial<'a> { + fn new(dialer: &'a mut Boxed<(PeerId, StreamMuxerBox)>, addr: Multiaddr) -> Self { + Self { + dial_task: dialer.dial(addr).unwrap().map(|r| r.unwrap()).boxed(), + dialer, + } } } -fn create_swarm() -> Result>> { - let id_keys = identity::Keypair::generate_ed25519(); - let peer_id = id_keys.public().to_peer_id(); - let transport = webrtc::Transport::new( - id_keys, - webrtc::Certificate::generate(&mut thread_rng()).unwrap(), - ); +impl Future for Dial<'_> { + type Output = (PeerId, StreamMuxerBox); - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - let behaviour = RequestResponse::new(PingCodec(), protocols, cfg); - let transport = transport - .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) - .boxed(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match self.dialer.poll_next_unpin(cx) { + Poll::Ready(_) => { + continue; + } + Poll::Pending => {} + } - Ok(Swarm::with_tokio_executor(transport, behaviour, peer_id)) + let conn = ready!(self.dial_task.poll_unpin(cx)); + return Poll::Ready(conn); + } + } +} + +impl Future for ListenUpgrade<'_> { + type Output = (PeerId, Multiaddr, StreamMuxerBox); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + match dbg!(self.listener.poll_next_unpin(cx)) { + Poll::Ready(Some(TransportEvent::Incoming { + upgrade, + send_back_addr, + .. + })) => { + self.listener_upgrade_task = Some( + async move { + let (peer, conn) = upgrade.await.unwrap(); + + (peer, send_back_addr, conn) + } + .boxed(), + ); + continue; + } + Poll::Ready(None) => unreachable!("stream never ends"), + Poll::Ready(Some(_)) => continue, + Poll::Pending => {} + } + + let conn = match self.listener_upgrade_task.as_mut() { + None => return Poll::Pending, + Some(inner) => ready!(inner.poll_unpin(cx)), + }; + + return Poll::Ready(conn); + } + } } diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index cc848e58..a9d43395 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -24,9 +24,9 @@ url = "2.1" webpki-roots = "0.22" [dev-dependencies] -libp2p = { path = "../..", features = ["full"] } +libp2p-tcp = { path = "../tcp", features = ["async-io"] } -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index dd6d9ef2..55b37db8 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -221,8 +221,8 @@ where mod tests { use super::WsConfig; use futures::prelude::*; - use libp2p::core::{multiaddr::Protocol, Multiaddr, PeerId, Transport}; - use libp2p::tcp; + use libp2p_core::{multiaddr::Protocol, Multiaddr, PeerId, Transport}; + use libp2p_tcp as tcp; #[test] fn dialer_connects_to_listener_ipv4() {