refactor: move examples to common location

Refactor examples into separate binary crates.

Fixes https://github.com/libp2p/rust-libp2p/issues/3111.

Pull-Request: #3509.
This commit is contained in:
Oleg Kubrakov
2023-03-08 16:17:33 +08:00
committed by GitHub
parent 2c10cd838a
commit b63e05dad6
30 changed files with 1169 additions and 1495 deletions

View File

@ -213,7 +213,7 @@ jobs:
save-if: ${{ github.ref == 'refs/heads/master' }} save-if: ${{ github.ref == 'refs/heads/master' }}
- name: Run ipfs-kad example - name: Run ipfs-kad example
run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad --features full run: cd ./examples/ipfs-kad/ && RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run
rustfmt: rustfmt:
runs-on: ubuntu-latest runs-on: ubuntu-latest

155
Cargo.lock generated
View File

@ -306,7 +306,7 @@ dependencies = [
"slab", "slab",
"socket2", "socket2",
"waker-fn", "waker-fn",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -346,7 +346,7 @@ dependencies = [
"futures-lite", "futures-lite",
"libc", "libc",
"signal-hook", "signal-hook",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -634,6 +634,17 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "chat-example"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"env_logger 0.10.0",
"futures",
"libp2p",
]
[[package]] [[package]]
name = "ciborium" name = "ciborium"
version = "0.2.0" version = "0.2.0"
@ -1104,6 +1115,18 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dcutr"
version = "0.1.0"
dependencies = [
"clap 4.1.6",
"env_logger 0.10.0",
"futures",
"futures-timer",
"libp2p",
"log",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.6.1" version = "0.6.1"
@ -1205,6 +1228,18 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "distributed-key-value-store"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"env_logger 0.10.0",
"futures",
"libp2p",
"multiaddr",
]
[[package]] [[package]]
name = "dtoa" name = "dtoa"
version = "1.0.5" version = "1.0.5"
@ -1373,6 +1408,20 @@ version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a214f5bb88731d436478f3ae1f8a277b62124089ba9fb67f4f93fb100ef73c90" checksum = "a214f5bb88731d436478f3ae1f8a277b62124089ba9fb67f4f93fb100ef73c90"
[[package]]
name = "file-sharing"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"clap 4.1.6",
"either",
"env_logger 0.10.0",
"futures",
"libp2p",
"multiaddr",
]
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.25" version = "1.0.25"
@ -1823,6 +1872,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "identify"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"futures",
"libp2p",
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.2.3" version = "0.2.3"
@ -1946,7 +2005,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -1961,6 +2020,30 @@ dependencies = [
"winreg", "winreg",
] ]
[[package]]
name = "ipfs-kad"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"env_logger 0.10.0",
"futures",
"libp2p",
]
[[package]]
name = "ipfs-private"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"either",
"env_logger 0.10.0",
"futures",
"libp2p",
"multiaddr",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.7.1" version = "2.7.1"
@ -1976,7 +2059,7 @@ dependencies = [
"hermit-abi 0.2.6", "hermit-abi 0.2.6",
"io-lifetimes", "io-lifetimes",
"rustix", "rustix",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -2942,7 +3025,7 @@ dependencies = [
"libc", "libc",
"log", "log",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -3305,7 +3388,7 @@ dependencies = [
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec", "smallvec",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -3376,6 +3459,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ping-example"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"futures",
"libp2p",
"multiaddr",
]
[[package]] [[package]]
name = "pkcs8" name = "pkcs8"
version = "0.9.0" version = "0.9.0"
@ -3437,7 +3531,7 @@ dependencies = [
"libc", "libc",
"log", "log",
"wepoll-ffi", "wepoll-ffi",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -3801,6 +3895,19 @@ version = "0.6.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
[[package]]
name = "rendezvous-example"
version = "0.1.0"
dependencies = [
"async-std",
"async-trait",
"env_logger 0.10.0",
"futures",
"libp2p",
"log",
"tokio",
]
[[package]] [[package]]
name = "resolv-conf" name = "resolv-conf"
version = "0.7.0" version = "0.7.0"
@ -3935,7 +4042,7 @@ dependencies = [
"io-lifetimes", "io-lifetimes",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -4387,7 +4494,7 @@ dependencies = [
"fastrand", "fastrand",
"redox_syscall", "redox_syscall",
"rustix", "rustix",
"windows-sys", "windows-sys 0.42.0",
] ]
[[package]] [[package]]
@ -4479,9 +4586,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.25.0" version = "1.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@ -4494,7 +4601,7 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"windows-sys", "windows-sys 0.45.0",
] ]
[[package]] [[package]]
@ -5238,6 +5345,30 @@ dependencies = [
"windows_x86_64_msvc 0.42.1", "windows_x86_64_msvc 0.42.1",
] ]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.42.1",
"windows_i686_gnu 0.42.1",
"windows_i686_msvc 0.42.1",
"windows_x86_64_gnu 0.42.1",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.42.1",
]
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.42.1" version = "0.42.1"

View File

@ -142,6 +142,15 @@ libp2p-tcp = { path = "transports/tcp", features = ["tokio"] }
[workspace] [workspace]
members = [ members = [
"core", "core",
"examples/chat-example",
"examples/dcutr",
"examples/distributed-key-value-store",
"examples/file-sharing",
"examples/identify",
"examples/ipfs-kad",
"examples/ipfs-private",
"examples/ping-example",
"examples/rendezvous",
"misc/metrics", "misc/metrics",
"misc/multistream-select", "misc/multistream-select",
"misc/rw-stream-sink", "misc/rw-stream-sink",
@ -180,42 +189,6 @@ members = [
"interop-tests" "interop-tests"
] ]
[[example]]
name = "chat"
required-features = ["full"]
[[example]]
name = "chat-tokio"
required-features = ["full"]
[[example]]
name = "file-sharing"
required-features = ["full"]
[[example]]
name = "gossipsub-chat"
required-features = ["full"]
[[example]]
name = "ipfs-private"
required-features = ["full"]
[[example]]
name = "ipfs-kad"
required-features = ["full"]
[[example]]
name = "ping"
required-features = ["full"]
[[example]]
name = "mdns-passive-discovery"
required-features = ["full"]
[[example]]
name = "distributed-key-value-store"
required-features = ["full"]
# Passing arguments to the docsrs builder in order to properly document cfg's. # Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling # More information: https://docs.rs/about/builds#cross-compiling
[package.metadata.docs.rs] [package.metadata.docs.rs]

View File

@ -4,56 +4,24 @@ A set of examples showcasing how to use rust-libp2p.
## Getting started ## Getting started
- [Ping](ping.rs)
Small `ping` clone, sending a ping to a peer, expecting a pong as a response. See
[tutorial](../src/tutorials/ping.rs) for a step-by-step guide building the example.
## Individual libp2p features ## Individual libp2p features
- [Chat](./chat.rs) - [Chat](./chat) A basic chat application demonstrating libp2p and the mDNS and Gossipsub protocols.
- [Distributed key-value store](./distributed-key-value-store) A basic key value store demonstrating libp2p and the mDNS and Kademlia protocol.
A basic chat application demonstrating libp2p and the mDNS and floodsub protocols. - [File sharing application](./file-sharing) Basic file sharing application with peers either providing or locating and getting files by name.
- [Gossipsub chat](./gossipsub-chat.rs)
Same as the chat example but using mDNS and the Gossipsub protocol.
- [Tokio based chat](./chat-tokio.rs)
Same as the chat example but using tokio for all asynchronous tasks and I/O.
- [Distributed key-value store](./distributed-key-value-store.rs)
A basic key value store demonstrating libp2p and the mDNS and Kademlia protocol.
- [Identify](../protocols/identify/examples/identify.rs)
Demonstrates how to use identify protocol to query peer information.
- [IPFS Kademlia](ipfs-kad.rs)
Demonstrates how to perform Kademlia queries on the IPFS network.
- [IPFS Private](ipfs-private.rs)
Implementation using the gossipsub, ping and identify protocols to implement the ipfs private
swarms feature.
- [Passive Discovery via MDNS](mdns-passive-discovery.rs)
Discover peers on the same network via the MDNS protocol.
- [Hole punching tutorial](https://docs.rs/libp2p/latest/libp2p/tutorials/hole_punching/index.html)
Tutorial on how to overcome firewalls and NATs with libp2ps hole punching mechanism.
## Integration into a larger application
- [File sharing application](./file-sharing.rs)
Basic file sharing application with peers either providing or locating and getting files by name.
While obviously showcasing how to build a basic file sharing application with the Kademlia and While obviously showcasing how to build a basic file sharing application with the Kademlia and
Request-Response protocol, the actual goal of this example is **to show how to integrate Request-Response protocol, the actual goal of this example is **to show how to integrate
rust-libp2p into a larger application**. rust-libp2p into a larger application**.
- [IPFS Kademlia](./ipfs-kad) Demonstrates how to perform Kademlia queries on the IPFS network.
- [IPFS Private](./ipfs-private) Implementation using the gossipsub, ping and identify protocols to implement the ipfs private swarms feature.
- [Ping](./ping) Small `ping` clone, sending a ping to a peer, expecting a pong as a response. See [tutorial](../src/tutorials/ping.rs) for a step-by-step guide building the example.
- [Rendezvous](./rendezvous) Rendezvous Protocol. See [specs](https://github.com/libp2p/specs/blob/master/rendezvous/README.md).

View File

@ -0,0 +1,12 @@
[package]
name = "chat-example"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
env_logger = "0.10.0"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "dns", "gossipsub", "mdns", "mplex", "noise", "macros", "tcp", "websocket", "yamux"] }

View File

@ -24,7 +24,7 @@
//! Using two terminal windows, start two instances, typing the following in each: //! Using two terminal windows, start two instances, typing the following in each:
//! //!
//! ```sh //! ```sh
//! cargo run --example gossipsub-chat --features=full //! cargo run
//! ``` //! ```
//! //!
//! Mutual mDNS discovery may take a few seconds. When each peer does discover the other //! Mutual mDNS discovery may take a few seconds. When each peer does discover the other

View File

@ -1,157 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A basic chat application demonstrating libp2p with the mDNS and floodsub protocols
//! using tokio for all asynchronous tasks and I/O. In order for all used libp2p
//! crates to use tokio, it enables tokio-specific features for some crates.
//!
//! The example is run per node as follows:
//!
//! ```sh
//! cargo run --example chat-tokio --features=full
//! ```
use futures::StreamExt;
use libp2p::{
core::upgrade,
floodsub::{self, Floodsub, FloodsubEvent},
identity, mdns, mplex, noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, Multiaddr, PeerId, Transport,
};
use std::error::Error;
use tokio::io::{self, AsyncBufReadExt};
/// The `tokio::main` attribute sets up a tokio runtime.
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random PeerId
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
println!("Local peer id: {peer_id:?}");
// Create a tokio-based TCP transport use noise for authenticated
// encryption and Mplex for multiplexing of substreams on a TCP stream.
let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(
noise::NoiseAuthenticated::xx(&id_keys)
.expect("Signing libp2p-noise static DH keypair failed."),
)
.multiplex(mplex::MplexConfig::new())
.boxed();
// Create a Floodsub topic
let floodsub_topic = floodsub::Topic::new("chat");
// We create a custom behaviour that combines floodsub and mDNS.
// The derive generates a delegating `NetworkBehaviour` impl.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
floodsub: Floodsub,
mdns: mdns::tokio::Behaviour,
}
#[allow(clippy::large_enum_variant)]
enum MyBehaviourEvent {
Floodsub(FloodsubEvent),
Mdns(mdns::Event),
}
impl From<FloodsubEvent> for MyBehaviourEvent {
fn from(event: FloodsubEvent) -> Self {
MyBehaviourEvent::Floodsub(event)
}
}
impl From<mdns::Event> for MyBehaviourEvent {
fn from(event: mdns::Event) -> Self {
MyBehaviourEvent::Mdns(event)
}
}
// Create a Swarm to manage peers and events.
let mdns_behaviour = mdns::Behaviour::new(Default::default(), peer_id)?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns: mdns_behaviour,
};
behaviour.floodsub.subscribe(floodsub_topic.clone());
let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id);
// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
let addr: Multiaddr = to_dial.parse()?;
swarm.dial(addr)?;
println!("Dialed {to_dial:?}");
}
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off
loop {
tokio::select! {
line = stdin.next_line() => {
let line = line?.expect("stdin closed");
swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
}
event = swarm.select_next_some() => {
match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {address:?}");
}
SwarmEvent::Behaviour(MyBehaviourEvent::Floodsub(FloodsubEvent::Message(message))) => {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(event)) => {
match event {
mdns::Event::Discovered(list) => {
for (peer, _) in list {
swarm.behaviour_mut().floodsub.add_node_to_partial_view(peer);
}
}
mdns::Event::Expired(list) => {
for (peer, _) in list {
if !swarm.behaviour().mdns.has_node(&peer) {
swarm.behaviour_mut().floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
}
_ => {}
}
}
}
}
}

View File

@ -1,179 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A basic chat application demonstrating libp2p and the mDNS and floodsub protocols.
//!
//! Using two terminal windows, start two instances. If you local network allows mDNS,
//! they will automatically connect. Type a message in either terminal and hit return: the
//! message is sent and printed in the other terminal. Close with Ctrl-c.
//!
//! You can of course open more terminal windows and add more participants.
//! Dialing any of the other peers will propagate the new participant to all
//! chat members and everyone will receive all messages.
//!
//! # If they don't automatically connect
//!
//! If the nodes don't automatically connect, take note of the listening addresses of the first
//! instance and start the second with one of the addresses as the first argument. In the first
//! terminal window, run:
//!
//! ```sh
//! cargo run --example chat --features=full
//! ```
//!
//! It will print the PeerId and the listening addresses, e.g. `Listening on
//! "/ip4/0.0.0.0/tcp/24915"`
//!
//! In the second terminal window, start a new instance of the example with:
//!
//! ```sh
//! cargo run --example chat --features=full -- /ip4/127.0.0.1/tcp/24915
//! ```
//!
//! The two nodes then connect.
use async_std::io;
use futures::{
prelude::{stream::StreamExt, *},
select,
};
use libp2p::{
floodsub::{self, Floodsub, FloodsubEvent},
identity, mdns,
swarm::{NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm,
};
use std::error::Error;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random PeerId
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {local_peer_id:?}");
// Set up an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::development_transport(local_key).await?;
// Create a Floodsub topic
let floodsub_topic = floodsub::Topic::new("chat");
// We create a custom network behaviour that combines floodsub and mDNS.
// Use the derive to generate delegating NetworkBehaviour impl.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent")]
struct MyBehaviour {
floodsub: Floodsub,
mdns: mdns::async_io::Behaviour,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum OutEvent {
Floodsub(FloodsubEvent),
Mdns(mdns::Event),
}
impl From<mdns::Event> for OutEvent {
fn from(v: mdns::Event) -> Self {
Self::Mdns(v)
}
}
impl From<FloodsubEvent> for OutEvent {
fn from(v: FloodsubEvent) -> Self {
Self::Floodsub(v)
}
}
// Create a Swarm to manage peers and events
let mut swarm = {
let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(local_peer_id),
mdns,
};
behaviour.floodsub.subscribe(floodsub_topic.clone());
Swarm::with_threadpool_executor(transport, behaviour, local_peer_id)
};
// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
let addr: Multiaddr = to_dial.parse()?;
swarm.dial(addr)?;
println!("Dialed {to_dial:?}")
}
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off
loop {
select! {
line = stdin.select_next_some() => swarm
.behaviour_mut()
.floodsub
.publish(floodsub_topic.clone(), line.expect("Stdin not to close").as_bytes()),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {address:?}");
}
SwarmEvent::Behaviour(OutEvent::Floodsub(
FloodsubEvent::Message(message)
)) => {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
SwarmEvent::Behaviour(OutEvent::Mdns(
mdns::Event::Discovered(list)
)) => {
for (peer, _) in list {
swarm
.behaviour_mut()
.floodsub
.add_node_to_partial_view(peer);
}
}
SwarmEvent::Behaviour(OutEvent::Mdns(mdns::Event::Expired(
list
))) => {
for (peer, _) in list {
if !swarm.behaviour_mut().mdns.has_node(&peer) {
swarm
.behaviour_mut()
.floodsub
.remove_node_from_partial_view(&peer);
}
}
},
_ => {}
}
}
}
}

13
examples/dcutr/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "dcutr"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
clap = { version = "4.1.6", features = ["derive"] }
env_logger = "0.10.0"
futures = "0.3.26"
futures-timer = "3.0"
libp2p = { path = "../../", features = ["async-std", "dns", "dcutr", "identify", "macros", "mplex", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] }
log = "0.4"

View File

@ -19,22 +19,23 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use clap::Parser; use clap::Parser;
use futures::executor::{block_on, ThreadPool}; use futures::{
use futures::future::FutureExt; executor::{block_on, ThreadPool},
use futures::stream::StreamExt; future::FutureExt,
use libp2p_core::multiaddr::{Multiaddr, Protocol}; stream::StreamExt,
use libp2p_core::transport::OrTransport; };
use libp2p_core::upgrade; use libp2p::{
use libp2p_core::Transport; core::{
use libp2p_core::{identity, PeerId}; multiaddr::{Multiaddr, Protocol},
use libp2p_dcutr as dcutr; transport::{OrTransport, Transport},
use libp2p_dns::DnsConfig; upgrade, PeerId,
use libp2p_identify as identify; },
use libp2p_noise as noise; dcutr,
use libp2p_ping as ping; dns::DnsConfig,
use libp2p_relay as relay; identify, identity, noise, ping, relay,
use libp2p_swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
use libp2p_tcp as tcp; tcp, yamux,
};
use log::info; use log::info;
use std::error::Error; use std::error::Error;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
@ -100,15 +101,11 @@ fn main() -> Result<(), Box<dyn Error>> {
noise::NoiseAuthenticated::xx(&local_key) noise::NoiseAuthenticated::xx(&local_key)
.expect("Signing libp2p-noise static DH keypair failed."), .expect("Signing libp2p-noise static DH keypair failed."),
) )
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(yamux::YamuxConfig::default())
.boxed(); .boxed();
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour( #[behaviour(out_event = "Event", event_process = false)]
out_event = "Event",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct Behaviour { struct Behaviour {
relay_client: relay::client::Behaviour, relay_client: relay::client::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,

View File

@ -0,0 +1,13 @@
[package]
name = "distributed-key-value-store"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
env_logger = "0.10"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "dns", "kad", "mdns", "mplex", "noise", "macros", "tcp", "websocket", "yamux"] }
multiaddr = { version = "0.17.0" }

View File

@ -44,15 +44,14 @@ use async_std::io;
use futures::{prelude::*, select}; use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{ use libp2p::kad::{
record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult, record::Key, AddProviderOk, GetProvidersOk, GetRecordOk, Kademlia, KademliaEvent, PeerRecord,
Quorum, Record, PutRecordOk, QueryResult, Quorum, Record,
}; };
use libp2p::{ use libp2p::{
development_transport, identity, mdns, development_transport, identity, mdns,
swarm::{NetworkBehaviour, SwarmEvent}, swarm::{NetworkBehaviour, SwarmEvent},
PeerId, Swarm, PeerId, Swarm,
}; };
use libp2p_kad::{GetProvidersOk, GetRecordOk};
use std::error::Error; use std::error::Error;
#[async_std::main] #[async_std::main]

View File

@ -1,751 +0,0 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! # File sharing example
//!
//! Basic file sharing application with peers either providing or locating and
//! getting files by name.
//!
//! While obviously showcasing how to build a basic file sharing application,
//! the actual goal of this example is **to show how to integrate rust-libp2p
//! into a larger application**.
//!
//! ## Sample plot
//!
//! Assuming there are 3 nodes, A, B and C. A and B each provide a file while C
//! retrieves a file.
//!
//! Provider nodes A and B each provide a file, file FA and FB respectively.
//! They do so by advertising themselves as a provider for their file on a DHT
//! via [`libp2p-kad`]. The two, among other nodes of the network, are
//! interconnected via the DHT.
//!
//! Node C can locate the providers for file FA or FB on the DHT via
//! [`libp2p-kad`] without being connected to the specific node providing the
//! file, but any node of the DHT. Node C then connects to the corresponding
//! node and requests the file content of the file via
//! [`libp2p-request-response`].
//!
//! ## Architectural properties
//!
//! - Clean clonable async/await interface ([`Client`]) to interact with the
//! network layer.
//!
//! - Single task driving the network layer, no locks required.
//!
//! ## Usage
//!
//! A two node setup with one node providing the file and one node requesting the file.
//!
//! 1. Run command below in one terminal.
//!
//! ```
//! cargo run --example file-sharing --features=full -- \
//! --listen-address /ip4/127.0.0.1/tcp/40837 \
//! --secret-key-seed 1 \
//! provide \
//! --path <path-to-your-file> \
//! --name <name-for-others-to-find-your-file>
//! ```
//!
//! 2. Run command below in another terminal.
//!
//! ```
//! cargo run --example file-sharing --features=full -- \
//! --peer /ip4/127.0.0.1/tcp/40837/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X \
//! get \
//! --name <name-for-others-to-find-your-file>
//! ```
//!
//! Note: The client does not need to be directly connected to the providing
//! peer, as long as both are connected to some node on the same DHT.
use async_std::io;
use async_std::task::spawn;
use clap::Parser;
use futures::prelude::*;
use libp2p::core::{Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use std::error::Error;
use std::io::Write;
use std::path::PathBuf;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let opt = Opt::parse();
let (mut network_client, mut network_events, network_event_loop) =
network::new(opt.secret_key_seed).await?;
// Spawn the network task for it to run in the background.
spawn(network_event_loop.run());
// In case a listen address was provided use it, otherwise listen on any
// address.
match opt.listen_address {
Some(addr) => network_client
.start_listening(addr)
.await
.expect("Listening not to fail."),
None => network_client
.start_listening("/ip4/0.0.0.0/tcp/0".parse()?)
.await
.expect("Listening not to fail."),
};
// In case the user provided an address of a peer on the CLI, dial it.
if let Some(addr) = opt.peer {
let peer_id = match addr.iter().last() {
Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."),
_ => return Err("Expect peer multiaddr to contain peer ID.".into()),
};
network_client
.dial(peer_id, addr)
.await
.expect("Dial to succeed");
}
match opt.argument {
// Providing a file.
CliArgument::Provide { path, name } => {
// Advertise oneself as a provider of the file on the DHT.
network_client.start_providing(name.clone()).await;
loop {
match network_events.next().await {
// Reply with the content of the file on incoming requests.
Some(network::Event::InboundRequest { request, channel }) => {
if request == name {
network_client
.respond_file(std::fs::read(&path)?, channel)
.await;
}
}
e => todo!("{:?}", e),
}
}
}
// Locating and getting a file.
CliArgument::Get { name } => {
// Locate all nodes providing the file.
let providers = network_client.get_providers(name.clone()).await;
if providers.is_empty() {
return Err(format!("Could not find provider for file {name}.").into());
}
// Request the content of the file from each node.
let requests = providers.into_iter().map(|p| {
let mut network_client = network_client.clone();
let name = name.clone();
async move { network_client.request_file(p, name).await }.boxed()
});
// Await the requests, ignore the remaining once a single one succeeds.
let file_content = futures::future::select_ok(requests)
.await
.map_err(|_| "None of the providers returned file.")?
.0;
std::io::stdout().write_all(&file_content)?;
}
}
Ok(())
}
#[derive(Parser, Debug)]
#[clap(name = "libp2p file sharing example")]
struct Opt {
/// Fixed value to generate deterministic peer ID.
#[clap(long)]
secret_key_seed: Option<u8>,
#[clap(long)]
peer: Option<Multiaddr>,
#[clap(long)]
listen_address: Option<Multiaddr>,
#[clap(subcommand)]
argument: CliArgument,
}
#[derive(Debug, Parser)]
enum CliArgument {
Provide {
#[clap(long)]
path: PathBuf,
#[clap(long)]
name: String,
},
Get {
#[clap(long)]
name: String,
},
}
/// The network module, encapsulating all network related logic.
mod network {
use super::*;
use async_trait::async_trait;
use either::Either;
use futures::channel::{mpsc, oneshot};
use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName};
use libp2p::identity;
use libp2p::identity::ed25519;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel};
use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent};
use std::collections::{hash_map, HashMap, HashSet};
use std::iter;
/// Creates the network components, namely:
///
/// - The network client to interact with the network layer from anywhere
/// within your application.
///
/// - The network event stream, e.g. for incoming requests.
///
/// - The network task driving the network itself.
pub async fn new(
secret_key_seed: Option<u8>,
) -> Result<(Client, impl Stream<Item = Event>, EventLoop), Box<dyn Error>> {
// Create a public/private key pair, either random or based on a seed.
let id_keys = match secret_key_seed {
Some(seed) => {
let mut bytes = [0u8; 32];
bytes[0] = seed;
let secret_key = ed25519::SecretKey::from_bytes(&mut bytes).expect(
"this returns `Err` only if the length is wrong; the length is correct; qed",
);
identity::Keypair::Ed25519(secret_key.into())
}
None => identity::Keypair::generate_ed25519(),
};
let peer_id = id_keys.public().to_peer_id();
// Build the Swarm, connecting the lower layer transport logic with the
// higher layer network behaviour logic.
let swarm = Swarm::with_threadpool_executor(
libp2p::development_transport(id_keys).await?,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
request_response: request_response::Behaviour::new(
FileExchangeCodec(),
iter::once((FileExchangeProtocol(), ProtocolSupport::Full)),
Default::default(),
),
},
peer_id,
);
let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Ok((
Client {
sender: command_sender,
},
event_receiver,
EventLoop::new(swarm, command_receiver, event_sender),
))
}
#[derive(Clone)]
pub struct Client {
sender: mpsc::Sender<Command>,
}
impl Client {
/// Listen for incoming connections on the given address.
pub async fn start_listening(
&mut self,
addr: Multiaddr,
) -> Result<(), Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::StartListening { addr, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Dial the given peer at the given address.
pub async fn dial(
&mut self,
peer_id: PeerId,
peer_addr: Multiaddr,
) -> Result<(), Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::Dial {
peer_id,
peer_addr,
sender,
})
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Advertise the local node as the provider of the given file on the DHT.
pub async fn start_providing(&mut self, file_name: String) {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::StartProviding { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.");
}
/// Find the providers for the given file on the DHT.
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetProviders { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Request the content of the given file from the given peer.
pub async fn request_file(
&mut self,
peer: PeerId,
file_name: String,
) -> Result<Vec<u8>, Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::RequestFile {
file_name,
peer,
sender,
})
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not be dropped.")
}
/// Respond with the provided file content to the given request.
pub async fn respond_file(
&mut self,
file: Vec<u8>,
channel: ResponseChannel<FileResponse>,
) {
self.sender
.send(Command::RespondFile { file, channel })
.await
.expect("Command receiver not to be dropped.");
}
}
pub struct EventLoop {
swarm: Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}
impl EventLoop {
fn new(
swarm: Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
) -> Self {
Self {
swarm,
command_receiver,
event_sender,
pending_dial: Default::default(),
pending_start_providing: Default::default(),
pending_get_providers: Default::default(),
pending_request_file: Default::default(),
}
}
pub async fn run(mut self) {
loop {
futures::select! {
event = self.swarm.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await ,
command = self.command_receiver.next() => match command {
Some(c) => self.handle_command(c).await,
// Command channel closed, thus shutting down the network event loop.
None=> return,
},
}
}
}
async fn handle_event(
&mut self,
event: SwarmEvent<
ComposedEvent,
Either<ConnectionHandlerUpgrErr<io::Error>, io::Error>,
>,
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(_),
..
},
)) => {
let sender: oneshot::Sender<()> = self
.pending_start_providing
.remove(&id)
.expect("Completed query to be previously pending.");
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
providers,
..
})),
..
},
)) => {
if let Some(sender) = self.pending_get_providers.remove(&id) {
sender.send(providers).expect("Receiver not to be dropped");
// Finish the query. We are only interested in the first result.
self.swarm
.behaviour_mut()
.kademlia
.query_mut(&id)
.unwrap()
.finish();
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
result:
QueryResult::GetProviders(Ok(
GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
)),
..
},
)) => {}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::Message { message, .. },
)) => match message {
request_response::Message::Request {
request, channel, ..
} => {
self.event_sender
.send(Event::InboundRequest {
request: request.0,
channel,
})
.await
.expect("Event receiver not to be dropped.");
}
request_response::Message::Response {
request_id,
response,
} => {
let _ = self
.pending_request_file
.remove(&request_id)
.expect("Request to still be pending.")
.send(Ok(response.0));
}
},
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::OutboundFailure {
request_id, error, ..
},
)) => {
let _ = self
.pending_request_file
.remove(&request_id)
.expect("Request to still be pending.")
.send(Err(Box::new(error)));
}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::ResponseSent { .. },
)) => {}
SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id();
eprintln!(
"Local node is listening on {:?}",
address.with(Protocol::P2p(local_peer_id.into()))
);
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
if endpoint.is_dialer() {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
let _ = sender.send(Ok(()));
}
}
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = peer_id {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
let _ = sender.send(Err(Box::new(error)));
}
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"),
e => panic!("{e:?}"),
}
}
async fn handle_command(&mut self, command: Command) {
match command {
Command::StartListening { addr, sender } => {
let _ = match self.swarm.listen_on(addr) {
Ok(_) => sender.send(Ok(())),
Err(e) => sender.send(Err(Box::new(e))),
};
}
Command::Dial {
peer_id,
peer_addr,
sender,
} => {
if let hash_map::Entry::Vacant(e) = self.pending_dial.entry(peer_id) {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, peer_addr.clone());
match self
.swarm
.dial(peer_addr.with(Protocol::P2p(peer_id.into())))
{
Ok(()) => {
e.insert(sender);
}
Err(e) => {
let _ = sender.send(Err(Box::new(e)));
}
}
} else {
todo!("Already dialing peer.");
}
}
Command::StartProviding { file_name, sender } => {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.start_providing(file_name.into_bytes().into())
.expect("No store error.");
self.pending_start_providing.insert(query_id, sender);
}
Command::GetProviders { file_name, sender } => {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.get_providers(file_name.into_bytes().into());
self.pending_get_providers.insert(query_id, sender);
}
Command::RequestFile {
file_name,
peer,
sender,
} => {
let request_id = self
.swarm
.behaviour_mut()
.request_response
.send_request(&peer, FileRequest(file_name));
self.pending_request_file.insert(request_id, sender);
}
Command::RespondFile { file, channel } => {
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, FileResponse(file))
.expect("Connection to peer to be still open.");
}
}
}
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "ComposedEvent")]
struct ComposedBehaviour {
request_response: request_response::Behaviour<FileExchangeCodec>,
kademlia: Kademlia<MemoryStore>,
}
#[derive(Debug)]
enum ComposedEvent {
RequestResponse(request_response::Event<FileRequest, FileResponse>),
Kademlia(KademliaEvent),
}
impl From<request_response::Event<FileRequest, FileResponse>> for ComposedEvent {
fn from(event: request_response::Event<FileRequest, FileResponse>) -> Self {
ComposedEvent::RequestResponse(event)
}
}
impl From<KademliaEvent> for ComposedEvent {
fn from(event: KademliaEvent) -> Self {
ComposedEvent::Kademlia(event)
}
}
#[derive(Debug)]
enum Command {
StartListening {
addr: Multiaddr,
sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
},
Dial {
peer_id: PeerId,
peer_addr: Multiaddr,
sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
},
StartProviding {
file_name: String,
sender: oneshot::Sender<()>,
},
GetProviders {
file_name: String,
sender: oneshot::Sender<HashSet<PeerId>>,
},
RequestFile {
file_name: String,
peer: PeerId,
sender: oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>,
},
RespondFile {
file: Vec<u8>,
channel: ResponseChannel<FileResponse>,
},
}
#[derive(Debug)]
pub enum Event {
InboundRequest {
request: String,
channel: ResponseChannel<FileResponse>,
},
}
// Simple file exchange protocol
#[derive(Debug, Clone)]
struct FileExchangeProtocol();
#[derive(Clone)]
struct FileExchangeCodec();
#[derive(Debug, Clone, PartialEq, Eq)]
struct FileRequest(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileResponse(Vec<u8>);
impl ProtocolName for FileExchangeProtocol {
fn protocol_name(&self) -> &[u8] {
"/file-exchange/1".as_bytes()
}
}
#[async_trait]
impl request_response::Codec for FileExchangeCodec {
type Protocol = FileExchangeProtocol;
type Request = FileRequest;
type Response = FileResponse;
async fn read_request<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 1_000_000).await?;
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileRequest(String::from_utf8(vec).unwrap()))
}
async fn read_response<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileResponse(vec))
}
async fn write_request<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
FileRequest(data): FileRequest,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
FileResponse(data): FileResponse,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
}
}

View File

@ -0,0 +1,15 @@
[package]
name = "file-sharing"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
clap = { version = "4.1.6", features = ["derive"] }
either = "1.8"
env_logger = "0.10"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "dns", "kad", "mplex", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] }
multiaddr = { version = "0.17.0" }

View File

@ -0,0 +1,206 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! # File sharing example
//!
//! Basic file sharing application with peers either providing or locating and
//! getting files by name.
//!
//! While obviously showcasing how to build a basic file sharing application,
//! the actual goal of this example is **to show how to integrate rust-libp2p
//! into a larger application**.
//!
//! ## Sample plot
//!
//! Assuming there are 3 nodes, A, B and C. A and B each provide a file while C
//! retrieves a file.
//!
//! Provider nodes A and B each provide a file, file FA and FB respectively.
//! They do so by advertising themselves as a provider for their file on a DHT
//! via [`libp2p-kad`]. The two, among other nodes of the network, are
//! interconnected via the DHT.
//!
//! Node C can locate the providers for file FA or FB on the DHT via
//! [`libp2p-kad`] without being connected to the specific node providing the
//! file, but any node of the DHT. Node C then connects to the corresponding
//! node and requests the file content of the file via
//! [`libp2p-request-response`].
//!
//! ## Architectural properties
//!
//! - Clean clonable async/await interface ([`Client`](network::Client)) to interact with the
//! network layer.
//!
//! - Single task driving the network layer, no locks required.
//!
//! ## Usage
//!
//! A two node setup with one node providing the file and one node requesting the file.
//!
//! 1. Run command below in one terminal.
//!
//! ```sh
//! cargo run -- --listen-address /ip4/127.0.0.1/tcp/40837 \
//! --secret-key-seed 1 \
//! provide \
//! --path <path-to-your-file> \
//! --name <name-for-others-to-find-your-file>
//! ```
//!
//! 2. Run command below in another terminal.
//!
//! ```sh
//! cargo run -- --peer /ip4/127.0.0.1/tcp/40837/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X \
//! get \
//! --name <name-for-others-to-find-your-file>
//! ```
//!
//! Note: The client does not need to be directly connected to the providing
//! peer, as long as both are connected to some node on the same DHT.
mod network;
use async_std::task::spawn;
use clap::Parser;
use futures::prelude::*;
use futures::StreamExt;
use libp2p::{
core::{Multiaddr, PeerId},
multiaddr::Protocol,
};
use std::error::Error;
use std::io::Write;
use std::path::PathBuf;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let opt = Opt::parse();
let (mut network_client, mut network_events, network_event_loop) =
network::new(opt.secret_key_seed).await?;
// Spawn the network task for it to run in the background.
spawn(network_event_loop.run());
// In case a listen address was provided use it, otherwise listen on any
// address.
match opt.listen_address {
Some(addr) => network_client
.start_listening(addr)
.await
.expect("Listening not to fail."),
None => network_client
.start_listening("/ip4/0.0.0.0/tcp/0".parse()?)
.await
.expect("Listening not to fail."),
};
// In case the user provided an address of a peer on the CLI, dial it.
if let Some(addr) = opt.peer {
let peer_id = match addr.iter().last() {
Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."),
_ => return Err("Expect peer multiaddr to contain peer ID.".into()),
};
network_client
.dial(peer_id, addr)
.await
.expect("Dial to succeed");
}
match opt.argument {
// Providing a file.
CliArgument::Provide { path, name } => {
// Advertise oneself as a provider of the file on the DHT.
network_client.start_providing(name.clone()).await;
loop {
match network_events.next().await {
// Reply with the content of the file on incoming requests.
Some(network::Event::InboundRequest { request, channel }) => {
if request == name {
network_client
.respond_file(std::fs::read(&path)?, channel)
.await;
}
}
e => todo!("{:?}", e),
}
}
}
// Locating and getting a file.
CliArgument::Get { name } => {
// Locate all nodes providing the file.
let providers = network_client.get_providers(name.clone()).await;
if providers.is_empty() {
return Err(format!("Could not find provider for file {name}.").into());
}
// Request the content of the file from each node.
let requests = providers.into_iter().map(|p| {
let mut network_client = network_client.clone();
let name = name.clone();
async move { network_client.request_file(p, name).await }.boxed()
});
// Await the requests, ignore the remaining once a single one succeeds.
let file_content = futures::future::select_ok(requests)
.await
.map_err(|_| "None of the providers returned file.")?
.0;
std::io::stdout().write_all(&file_content)?;
}
}
Ok(())
}
#[derive(Parser, Debug)]
#[clap(name = "libp2p file sharing example")]
struct Opt {
/// Fixed value to generate deterministic peer ID.
#[clap(long)]
secret_key_seed: Option<u8>,
#[clap(long)]
peer: Option<Multiaddr>,
#[clap(long)]
listen_address: Option<Multiaddr>,
#[clap(subcommand)]
argument: CliArgument,
}
#[derive(Debug, Parser)]
enum CliArgument {
Provide {
#[clap(long)]
path: PathBuf,
#[clap(long)]
name: String,
},
Get {
#[clap(long)]
name: String,
},
}

View File

@ -0,0 +1,541 @@
use async_std::io;
use async_trait::async_trait;
use either::Either;
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
use libp2p::{
core::{
upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName},
Multiaddr, PeerId,
},
identity,
kad::{
record::store::MemoryStore, GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult,
},
multiaddr::Protocol,
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent},
};
use std::collections::{hash_map, HashMap, HashSet};
use std::error::Error;
use std::iter;
/// Creates the network components, namely:
///
/// - The network client to interact with the network layer from anywhere
/// within your application.
///
/// - The network event stream, e.g. for incoming requests.
///
/// - The network task driving the network itself.
pub async fn new(
secret_key_seed: Option<u8>,
) -> Result<(Client, impl Stream<Item = Event>, EventLoop), Box<dyn Error>> {
// Create a public/private key pair, either random or based on a seed.
let id_keys = match secret_key_seed {
Some(seed) => {
let mut bytes = [0u8; 32];
bytes[0] = seed;
let secret_key = identity::ed25519::SecretKey::from_bytes(&mut bytes).expect(
"this returns `Err` only if the length is wrong; the length is correct; qed",
);
identity::Keypair::Ed25519(secret_key.into())
}
None => identity::Keypair::generate_ed25519(),
};
let peer_id = id_keys.public().to_peer_id();
// Build the Swarm, connecting the lower layer transport logic with the
// higher layer network behaviour logic.
let swarm = Swarm::with_threadpool_executor(
libp2p::development_transport(id_keys).await?,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
request_response: request_response::Behaviour::new(
FileExchangeCodec(),
iter::once((FileExchangeProtocol(), ProtocolSupport::Full)),
Default::default(),
),
},
peer_id,
);
let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Ok((
Client {
sender: command_sender,
},
event_receiver,
EventLoop::new(swarm, command_receiver, event_sender),
))
}
#[derive(Clone)]
pub struct Client {
sender: mpsc::Sender<Command>,
}
impl Client {
/// Listen for incoming connections on the given address.
pub async fn start_listening(&mut self, addr: Multiaddr) -> Result<(), Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::StartListening { addr, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Dial the given peer at the given address.
pub async fn dial(
&mut self,
peer_id: PeerId,
peer_addr: Multiaddr,
) -> Result<(), Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::Dial {
peer_id,
peer_addr,
sender,
})
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Advertise the local node as the provider of the given file on the DHT.
pub async fn start_providing(&mut self, file_name: String) {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::StartProviding { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.");
}
/// Find the providers for the given file on the DHT.
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetProviders { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Request the content of the given file from the given peer.
pub async fn request_file(
&mut self,
peer: PeerId,
file_name: String,
) -> Result<Vec<u8>, Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::RequestFile {
file_name,
peer,
sender,
})
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not be dropped.")
}
/// Respond with the provided file content to the given request.
pub async fn respond_file(&mut self, file: Vec<u8>, channel: ResponseChannel<FileResponse>) {
self.sender
.send(Command::RespondFile { file, channel })
.await
.expect("Command receiver not to be dropped.");
}
}
pub struct EventLoop {
swarm: Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}
impl EventLoop {
fn new(
swarm: Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
) -> Self {
Self {
swarm,
command_receiver,
event_sender,
pending_dial: Default::default(),
pending_start_providing: Default::default(),
pending_get_providers: Default::default(),
pending_request_file: Default::default(),
}
}
pub async fn run(mut self) {
loop {
futures::select! {
event = self.swarm.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await ,
command = self.command_receiver.next() => match command {
Some(c) => self.handle_command(c).await,
// Command channel closed, thus shutting down the network event loop.
None=> return,
},
}
}
}
async fn handle_event(
&mut self,
event: SwarmEvent<ComposedEvent, Either<ConnectionHandlerUpgrErr<io::Error>, io::Error>>,
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(_),
..
},
)) => {
let sender: oneshot::Sender<()> = self
.pending_start_providing
.remove(&id)
.expect("Completed query to be previously pending.");
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
providers, ..
})),
..
},
)) => {
if let Some(sender) = self.pending_get_providers.remove(&id) {
sender.send(providers).expect("Receiver not to be dropped");
// Finish the query. We are only interested in the first result.
self.swarm
.behaviour_mut()
.kademlia
.query_mut(&id)
.unwrap()
.finish();
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
..
})),
..
},
)) => {}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::Message { message, .. },
)) => match message {
request_response::Message::Request {
request, channel, ..
} => {
self.event_sender
.send(Event::InboundRequest {
request: request.0,
channel,
})
.await
.expect("Event receiver not to be dropped.");
}
request_response::Message::Response {
request_id,
response,
} => {
let _ = self
.pending_request_file
.remove(&request_id)
.expect("Request to still be pending.")
.send(Ok(response.0));
}
},
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::OutboundFailure {
request_id, error, ..
},
)) => {
let _ = self
.pending_request_file
.remove(&request_id)
.expect("Request to still be pending.")
.send(Err(Box::new(error)));
}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
request_response::Event::ResponseSent { .. },
)) => {}
SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id();
eprintln!(
"Local node is listening on {:?}",
address.with(Protocol::P2p(local_peer_id.into()))
);
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
if endpoint.is_dialer() {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
let _ = sender.send(Ok(()));
}
}
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = peer_id {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
let _ = sender.send(Err(Box::new(error)));
}
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"),
e => panic!("{e:?}"),
}
}
async fn handle_command(&mut self, command: Command) {
match command {
Command::StartListening { addr, sender } => {
let _ = match self.swarm.listen_on(addr) {
Ok(_) => sender.send(Ok(())),
Err(e) => sender.send(Err(Box::new(e))),
};
}
Command::Dial {
peer_id,
peer_addr,
sender,
} => {
if let hash_map::Entry::Vacant(e) = self.pending_dial.entry(peer_id) {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, peer_addr.clone());
match self
.swarm
.dial(peer_addr.with(Protocol::P2p(peer_id.into())))
{
Ok(()) => {
e.insert(sender);
}
Err(e) => {
let _ = sender.send(Err(Box::new(e)));
}
}
} else {
todo!("Already dialing peer.");
}
}
Command::StartProviding { file_name, sender } => {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.start_providing(file_name.into_bytes().into())
.expect("No store error.");
self.pending_start_providing.insert(query_id, sender);
}
Command::GetProviders { file_name, sender } => {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.get_providers(file_name.into_bytes().into());
self.pending_get_providers.insert(query_id, sender);
}
Command::RequestFile {
file_name,
peer,
sender,
} => {
let request_id = self
.swarm
.behaviour_mut()
.request_response
.send_request(&peer, FileRequest(file_name));
self.pending_request_file.insert(request_id, sender);
}
Command::RespondFile { file, channel } => {
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, FileResponse(file))
.expect("Connection to peer to be still open.");
}
}
}
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "ComposedEvent")]
struct ComposedBehaviour {
request_response: request_response::Behaviour<FileExchangeCodec>,
kademlia: Kademlia<MemoryStore>,
}
#[derive(Debug)]
enum ComposedEvent {
RequestResponse(request_response::Event<FileRequest, FileResponse>),
Kademlia(KademliaEvent),
}
impl From<request_response::Event<FileRequest, FileResponse>> for ComposedEvent {
fn from(event: request_response::Event<FileRequest, FileResponse>) -> Self {
ComposedEvent::RequestResponse(event)
}
}
impl From<KademliaEvent> for ComposedEvent {
fn from(event: KademliaEvent) -> Self {
ComposedEvent::Kademlia(event)
}
}
#[derive(Debug)]
enum Command {
StartListening {
addr: Multiaddr,
sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
},
Dial {
peer_id: PeerId,
peer_addr: Multiaddr,
sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
},
StartProviding {
file_name: String,
sender: oneshot::Sender<()>,
},
GetProviders {
file_name: String,
sender: oneshot::Sender<HashSet<PeerId>>,
},
RequestFile {
file_name: String,
peer: PeerId,
sender: oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>,
},
RespondFile {
file: Vec<u8>,
channel: ResponseChannel<FileResponse>,
},
}
#[derive(Debug)]
pub enum Event {
InboundRequest {
request: String,
channel: ResponseChannel<FileResponse>,
},
}
// Simple file exchange protocol
#[derive(Debug, Clone)]
struct FileExchangeProtocol();
#[derive(Clone)]
struct FileExchangeCodec();
#[derive(Debug, Clone, PartialEq, Eq)]
struct FileRequest(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileResponse(Vec<u8>);
impl ProtocolName for FileExchangeProtocol {
fn protocol_name(&self) -> &[u8] {
"/file-exchange/1".as_bytes()
}
}
#[async_trait]
impl request_response::Codec for FileExchangeCodec {
type Protocol = FileExchangeProtocol;
type Request = FileRequest;
type Response = FileResponse;
async fn read_request<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 1_000_000).await?;
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileRequest(String::from_utf8(vec).unwrap()))
}
async fn read_response<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileResponse(vec))
}
async fn write_request<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
FileRequest(data): FileRequest,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
FileResponse(data): FileResponse,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
}

View File

@ -0,0 +1,11 @@
[package]
name = "identify"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "dns", "dcutr", "identify", "macros", "mplex", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] }

View File

@ -23,7 +23,7 @@
//! In the first terminal window, run: //! In the first terminal window, run:
//! //!
//! ```sh //! ```sh
//! cargo run --example identify //! cargo run
//! ``` //! ```
//! It will print the [`PeerId`] and the listening addresses, e.g. `Listening on //! It will print the [`PeerId`] and the listening addresses, e.g. `Listening on
//! "/ip4/127.0.0.1/tcp/24915"` //! "/ip4/127.0.0.1/tcp/24915"`
@ -31,16 +31,18 @@
//! In the second terminal window, start a new instance of the example with: //! In the second terminal window, start a new instance of the example with:
//! //!
//! ```sh //! ```sh
//! cargo run --example identify -- /ip4/127.0.0.1/tcp/24915 //! cargo run -- /ip4/127.0.0.1/tcp/24915
//! ``` //! ```
//! The two nodes establish a connection, negotiate the identify protocol //! The two nodes establish a connection, negotiate the identify protocol
//! and will send each other identify info which is then printed to the console. //! and will send each other identify info which is then printed to the console.
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::upgrade::Version; use libp2p::{
use libp2p_core::{identity, Multiaddr, PeerId, Transport}; core::{multiaddr::Multiaddr, upgrade::Version, PeerId},
use libp2p_identify as identify; identify, identity, noise,
use libp2p_swarm::{Swarm, SwarmEvent}; swarm::{Swarm, SwarmEvent},
tcp, yamux, Transport,
};
use std::error::Error; use std::error::Error;
#[async_std::main] #[async_std::main]
@ -49,10 +51,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {local_peer_id:?}"); println!("Local peer id: {local_peer_id:?}");
let transport = libp2p_tcp::async_io::Transport::default() let transport = tcp::async_io::Transport::default()
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap()) .authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(yamux::YamuxConfig::default())
.boxed(); .boxed();
// Create a identify network behaviour. // Create a identify network behaviour.

View File

@ -0,0 +1,12 @@
[package]
name = "ipfs-kad"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
env_logger = "0.10"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "dns", "kad", "mplex", "noise", "tcp", "websocket", "yamux"] }

View File

@ -0,0 +1,14 @@
[package]
name = "ipfs-private"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
either = "1.8"
env_logger = "0.10"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "gossipsub", "dns", "identify", "kad", "macros", "mplex", "noise", "ping", "pnet", "tcp", "websocket", "yamux"] }
multiaddr = { version = "0.17.0" }

View File

@ -1,65 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::StreamExt;
use libp2p::{
identity, mdns,
swarm::{Swarm, SwarmEvent},
PeerId,
};
use std::error::Error;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
// Create a random PeerId.
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public());
println!("Local peer id: {peer_id:?}");
// Create a transport.
let transport = libp2p::development_transport(id_keys).await?;
// Create an MDNS network behaviour.
let behaviour = mdns::async_io::Behaviour::new(mdns::Config::default(), peer_id)?;
// Create a Swarm that establishes connections through the given transport.
// Note that the MDNS behaviour itself will not actually inititiate any connections,
// as it only uses UDP.
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(mdns::Event::Discovered(peers)) => {
for (peer, addr) in peers {
println!("discovered {peer} {addr}");
}
}
SwarmEvent::Behaviour(mdns::Event::Expired(expired)) => {
for (peer, addr) in expired {
println!("expired {peer} {addr}");
}
}
_ => {}
}
}
}

View File

@ -0,0 +1,12 @@
[package]
name = "ping-example"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "dns", "macros", "mplex", "noise", "ping", "tcp", "websocket", "yamux"] }
multiaddr = { version = "0.17.0" }

View File

@ -25,7 +25,7 @@
//! In the first terminal window, run: //! In the first terminal window, run:
//! //!
//! ```sh //! ```sh
//! cargo run --example ping --features=full //! cargo run
//! ``` //! ```
//! //!
//! It will print the PeerId and the listening addresses, e.g. `Listening on //! It will print the PeerId and the listening addresses, e.g. `Listening on
@ -34,15 +34,18 @@
//! In the second terminal window, start a new instance of the example with: //! In the second terminal window, start a new instance of the example with:
//! //!
//! ```sh //! ```sh
//! cargo run --example ping --features=full -- /ip4/127.0.0.1/tcp/24915 //! cargo run -- /ip4/127.0.0.1/tcp/24915
//! ``` //! ```
//! //!
//! The two nodes establish a connection, negotiate the ping protocol //! The two nodes establish a connection, negotiate the ping protocol
//! and begin pinging each other. //! and begin pinging each other.
use futures::prelude::*; use futures::prelude::*;
use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::{
use libp2p::{identity, ping, Multiaddr, PeerId}; identity, ping,
swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent},
Multiaddr, PeerId,
};
use std::error::Error; use std::error::Error;
#[async_std::main] #[async_std::main]
@ -78,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
/// Our network behaviour. /// Our network behaviour.
/// ///
/// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of /// For illustrative purposes, this includes the [`KeepAlive`](keep_alive::Behaviour) behaviour so a continuous sequence of
/// pings can be observed. /// pings can be observed.
#[derive(NetworkBehaviour, Default)] #[derive(NetworkBehaviour, Default)]
struct Behaviour { struct Behaviour {

View File

@ -0,0 +1,14 @@
[package]
name = "rendezvous-example"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
env_logger = "0.10.0"
futures = "0.3.26"
libp2p = { path = "../../", features = ["async-std", "identify", "macros", "mplex", "noise", "ping", "rendezvous", "tcp", "tokio", "yamux"] }
log = "0.4"
tokio = { version = "1.25", features = [ "rt-multi-thread", "macros", "time" ] }

View File

@ -19,12 +19,15 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::StreamExt; use futures::StreamExt;
use libp2p_core::{identity, multiaddr::Protocol, upgrade::Version, Multiaddr, PeerId, Transport}; use libp2p::{
use libp2p_ping as ping; core::transport::upgrade::Version,
use libp2p_rendezvous as rendezvous; identity,
use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; multiaddr::Protocol,
noise, ping, rendezvous,
swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent},
tcp, yamux, Multiaddr, PeerId, Transport,
};
use std::time::Duration; use std::time::Duration;
use void::Void;
const NAMESPACE: &str = "rendezvous"; const NAMESPACE: &str = "rendezvous";
@ -32,24 +35,24 @@ const NAMESPACE: &str = "rendezvous";
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
let identity = identity::Keypair::generate_ed25519(); let key_pair = identity::Keypair::generate_ed25519();
let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap(); let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap();
let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN"
.parse() .parse()
.unwrap(); .unwrap();
let mut swarm = Swarm::with_tokio_executor( let mut swarm = Swarm::with_tokio_executor(
libp2p_tcp::tokio::Transport::default() tcp::tokio::Transport::default()
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(yamux::YamuxConfig::default())
.boxed(), .boxed(),
MyBehaviour { MyBehaviour {
rendezvous: rendezvous::client::Behaviour::new(identity.clone()), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour, keep_alive: keep_alive::Behaviour,
}, },
PeerId::from(identity.public()), PeerId::from(key_pair.public()),
); );
log::info!("Local peer id: {}", swarm.local_peer_id()); log::info!("Local peer id: {}", swarm.local_peer_id());
@ -75,7 +78,7 @@ async fn main() {
rendezvous_point, rendezvous_point,
); );
} }
SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Discovered { SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(rendezvous::client::Event::Discovered {
registrations, registrations,
cookie: new_cookie, cookie: new_cookie,
.. ..
@ -99,7 +102,7 @@ async fn main() {
} }
} }
} }
SwarmEvent::Behaviour(MyEvent::Ping(ping::Event { SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
peer, peer,
result: Ok(ping::Success::Ping { rtt }), result: Ok(ping::Success::Ping { rtt }),
})) if peer != rendezvous_point => { })) if peer != rendezvous_point => {
@ -120,36 +123,7 @@ async fn main() {
} }
} }
#[derive(Debug)] #[derive(NetworkBehaviour)]
enum MyEvent {
Rendezvous(rendezvous::client::Event),
Ping(ping::Event),
}
impl From<rendezvous::client::Event> for MyEvent {
fn from(event: rendezvous::client::Event) -> Self {
MyEvent::Rendezvous(event)
}
}
impl From<ping::Event> for MyEvent {
fn from(event: ping::Event) -> Self {
MyEvent::Ping(event)
}
}
impl From<Void> for MyEvent {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}
#[derive(libp2p_swarm::NetworkBehaviour)]
#[behaviour(
out_event = "MyEvent",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct MyBehaviour { struct MyBehaviour {
rendezvous: rendezvous::client::Behaviour, rendezvous: rendezvous::client::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,

View File

@ -19,48 +19,47 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::StreamExt; use futures::StreamExt;
use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; use libp2p::{
use libp2p_identify as identify; core::transport::upgrade::Version,
use libp2p_ping as ping; identify, identity, noise, ping, rendezvous,
use libp2p_rendezvous as rendezvous; swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent},
use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; tcp, yamux, Multiaddr, PeerId, Transport,
};
use std::time::Duration; use std::time::Duration;
use void::Void;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
let key_pair = identity::Keypair::generate_ed25519();
let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap(); let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap();
let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN"
.parse() .parse()
.unwrap(); .unwrap();
let identity = identity::Keypair::generate_ed25519();
let mut swarm = Swarm::with_tokio_executor( let mut swarm = Swarm::with_tokio_executor(
libp2p_tcp::tokio::Transport::default() tcp::tokio::Transport::default()
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(yamux::YamuxConfig::default())
.boxed(), .boxed(),
MyBehaviour { MyBehaviour {
identify: identify::Behaviour::new(identify::Config::new( identify: identify::Behaviour::new(identify::Config::new(
"rendezvous-example/1.0.0".to_string(), "rendezvous-example/1.0.0".to_string(),
identity.public(), key_pair.public(),
)), )),
rendezvous: rendezvous::client::Behaviour::new(identity.clone()), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour, keep_alive: keep_alive::Behaviour,
}, },
PeerId::from(identity.public()), PeerId::from(key_pair.public()),
); );
log::info!("Local peer id: {}", swarm.local_peer_id()); log::info!("Local peer id: {}", swarm.local_peer_id());
let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
swarm.dial(rendezvous_point_address).unwrap(); swarm.dial(rendezvous_point_address.clone()).unwrap();
while let Some(event) = swarm.next().await { while let Some(event) = swarm.next().await {
match event { match event {
@ -75,18 +74,22 @@ async fn main() {
log::error!("Lost connection to rendezvous point {}", error); log::error!("Lost connection to rendezvous point {}", error);
} }
// once `/identify` did its job, we know our external address and can register // once `/identify` did its job, we know our external address and can register
SwarmEvent::Behaviour(MyEvent::Identify(identify::Event::Received { .. })) => { SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Received {
..
})) => {
swarm.behaviour_mut().rendezvous.register( swarm.behaviour_mut().rendezvous.register(
rendezvous::Namespace::from_static("rendezvous"), rendezvous::Namespace::from_static("rendezvous"),
rendezvous_point, rendezvous_point,
None, None,
); );
} }
SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::Registered {
namespace, namespace,
ttl, ttl,
rendezvous_node, rendezvous_node,
})) => { },
)) => {
log::info!( log::info!(
"Registered for namespace '{}' at rendezvous point {} for the next {} seconds", "Registered for namespace '{}' at rendezvous point {} for the next {} seconds",
namespace, namespace,
@ -94,13 +97,13 @@ async fn main() {
ttl ttl
); );
} }
SwarmEvent::Behaviour(MyEvent::Rendezvous( SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::RegisterFailed(error), rendezvous::client::Event::RegisterFailed(error),
)) => { )) => {
log::error!("Failed to register {}", error); log::error!("Failed to register {}", error);
return; return;
} }
SwarmEvent::Behaviour(MyEvent::Ping(ping::Event { SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
peer, peer,
result: Ok(ping::Success::Ping { rtt }), result: Ok(ping::Success::Ping { rtt }),
})) if peer != rendezvous_point => { })) if peer != rendezvous_point => {
@ -113,44 +116,7 @@ async fn main() {
} }
} }
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum MyEvent {
Rendezvous(rendezvous::client::Event),
Identify(identify::Event),
Ping(ping::Event),
}
impl From<rendezvous::client::Event> for MyEvent {
fn from(event: rendezvous::client::Event) -> Self {
MyEvent::Rendezvous(event)
}
}
impl From<identify::Event> for MyEvent {
fn from(event: identify::Event) -> Self {
MyEvent::Identify(event)
}
}
impl From<ping::Event> for MyEvent {
fn from(event: ping::Event) -> Self {
MyEvent::Ping(event)
}
}
impl From<Void> for MyEvent {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(
out_event = "MyEvent",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct MyBehaviour { struct MyBehaviour {
identify: identify::Behaviour, identify: identify::Behaviour,
rendezvous: rendezvous::client::Behaviour, rendezvous: rendezvous::client::Behaviour,

View File

@ -19,35 +19,36 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::StreamExt; use futures::StreamExt;
use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; use libp2p::{
use libp2p_ping as ping; core::transport::upgrade::Version,
use libp2p_rendezvous as rendezvous; identity, noise, ping, rendezvous,
use libp2p_swarm::AddressScore; swarm::{keep_alive, AddressScore, NetworkBehaviour, Swarm, SwarmEvent},
use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; tcp, yamux, Multiaddr, PeerId, Transport,
};
use std::time::Duration; use std::time::Duration;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
let key_pair = identity::Keypair::generate_ed25519();
let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap(); let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::<Multiaddr>().unwrap();
let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN"
.parse() .parse()
.unwrap(); .unwrap();
let identity = identity::Keypair::generate_ed25519();
let mut swarm = Swarm::with_tokio_executor( let mut swarm = Swarm::with_tokio_executor(
libp2p_tcp::tokio::Transport::default() tcp::tokio::Transport::default()
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(yamux::YamuxConfig::default())
.boxed(), .boxed(),
MyBehaviour { MyBehaviour {
rendezvous: rendezvous::client::Behaviour::new(identity.clone()), rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour,
}, },
PeerId::from(identity.public()), PeerId::from(key_pair.public()),
); );
// In production the external address should be the publicly facing IP address of the rendezvous point. // In production the external address should be the publicly facing IP address of the rendezvous point.
@ -57,9 +58,7 @@ async fn main() {
log::info!("Local peer id: {}", swarm.local_peer_id()); log::info!("Local peer id: {}", swarm.local_peer_id());
let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); swarm.dial(rendezvous_point_address.clone()).unwrap();
swarm.dial(rendezvous_point_address).unwrap();
while let Some(event) = swarm.next().await { while let Some(event) = swarm.next().await {
match event { match event {
@ -82,11 +81,13 @@ async fn main() {
log::info!("Connection established with rendezvous point {}", peer_id); log::info!("Connection established with rendezvous point {}", peer_id);
} }
// once `/identify` did its job, we know our external address and can register // once `/identify` did its job, we know our external address and can register
SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::Registered {
namespace, namespace,
ttl, ttl,
rendezvous_node, rendezvous_node,
})) => { },
)) => {
log::info!( log::info!(
"Registered for namespace '{}' at rendezvous point {} for the next {} seconds", "Registered for namespace '{}' at rendezvous point {} for the next {} seconds",
namespace, namespace,
@ -94,13 +95,13 @@ async fn main() {
ttl ttl
); );
} }
SwarmEvent::Behaviour(MyEvent::Rendezvous( SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::RegisterFailed(error), rendezvous::client::Event::RegisterFailed(error),
)) => { )) => {
log::error!("Failed to register {}", error); log::error!("Failed to register {}", error);
return; return;
} }
SwarmEvent::Behaviour(MyEvent::Ping(ping::Event { SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
peer, peer,
result: Ok(ping::Success::Ping { rtt }), result: Ok(ping::Success::Ping { rtt }),
})) if peer != rendezvous_point => { })) if peer != rendezvous_point => {
@ -113,31 +114,9 @@ async fn main() {
} }
} }
#[derive(Debug)]
enum MyEvent {
Rendezvous(rendezvous::client::Event),
Ping(ping::Event),
}
impl From<rendezvous::client::Event> for MyEvent {
fn from(event: rendezvous::client::Event) -> Self {
MyEvent::Rendezvous(event)
}
}
impl From<ping::Event> for MyEvent {
fn from(event: ping::Event) -> Self {
MyEvent::Ping(event)
}
}
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(
out_event = "MyEvent",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct MyBehaviour { struct MyBehaviour {
rendezvous: rendezvous::client::Behaviour, rendezvous: rendezvous::client::Behaviour,
ping: ping::Behaviour, ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
} }

View File

@ -18,53 +18,60 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::StreamExt;
use libp2p_core::{identity, upgrade::Version, PeerId, Transport};
use libp2p_identify as identify;
use libp2p_ping as ping;
use libp2p_rendezvous as rendezvous;
use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent};
use void::Void;
/// Examples for the rendezvous protocol: /// Examples for the rendezvous protocol:
/// ///
/// 1. Run the rendezvous server: /// 1. Run the rendezvous server:
/// RUST_LOG=info cargo run --example rendezvous_point /// ```
/// RUST_LOG=info cargo run --bin rendezvous-example
/// ```
/// 2. Register a peer: /// 2. Register a peer:
/// RUST_LOG=info cargo run --example register_with_identify /// ```
/// RUST_LOG=info cargo run --bin rzv-register
/// ```
/// 3. Try to discover the peer from (2): /// 3. Try to discover the peer from (2):
/// RUST_LOG=info cargo run --example discover /// ```
/// RUST_LOG=info cargo run --bin rzv-discover
/// ```
/// 4. Try to discover with identify:
/// ```
/// RUST_LOG=info cargo run --bin rzv-identify
/// ```
use futures::StreamExt;
use libp2p::{
core::transport::upgrade::Version,
identify, identity, noise, ping, rendezvous,
swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent},
tcp, yamux, PeerId, Transport,
};
use std::time::Duration;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
let bytes = [0u8; 32]; let key_pair = identity::Keypair::generate_ed25519();
let key = identity::ed25519::SecretKey::from_bytes(bytes).expect("we always pass 32 bytes");
let identity = identity::Keypair::Ed25519(key.into());
let mut swarm = Swarm::with_tokio_executor( let mut swarm = Swarm::with_tokio_executor(
libp2p_tcp::tokio::Transport::default() tcp::tokio::Transport::default()
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap())
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(yamux::YamuxConfig::default())
.boxed(), .boxed(),
MyBehaviour { MyBehaviour {
identify: identify::Behaviour::new(identify::Config::new( identify: identify::Behaviour::new(identify::Config::new(
"rendezvous-example/1.0.0".to_string(), "rendezvous-example/1.0.0".to_string(),
identity.public(), key_pair.public(),
)), )),
rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()),
ping: ping::Behaviour::new(ping::Config::new()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour, keep_alive: keep_alive::Behaviour,
}, },
PeerId::from(identity.public()), PeerId::from(key_pair.public()),
); );
log::info!("Local peer id: {}", swarm.local_peer_id()); log::info!("Local peer id: {}", swarm.local_peer_id());
swarm let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
.listen_on("/ip4/0.0.0.0/tcp/62649".parse().unwrap())
.unwrap();
while let Some(event) = swarm.next().await { while let Some(event) = swarm.next().await {
match event { match event {
@ -74,7 +81,7 @@ async fn main() {
SwarmEvent::ConnectionClosed { peer_id, .. } => { SwarmEvent::ConnectionClosed { peer_id, .. } => {
log::info!("Disconnected from {}", peer_id); log::info!("Disconnected from {}", peer_id);
} }
SwarmEvent::Behaviour(MyEvent::Rendezvous( SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::server::Event::PeerRegistered { peer, registration }, rendezvous::server::Event::PeerRegistered { peer, registration },
)) => { )) => {
log::info!( log::info!(
@ -83,7 +90,7 @@ async fn main() {
registration.namespace registration.namespace
); );
} }
SwarmEvent::Behaviour(MyEvent::Rendezvous( SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::server::Event::DiscoverServed { rendezvous::server::Event::DiscoverServed {
enquirer, enquirer,
registrations, registrations,
@ -102,43 +109,7 @@ async fn main() {
} }
} }
#[derive(Debug)]
enum MyEvent {
Rendezvous(rendezvous::server::Event),
Ping(ping::Event),
Identify(identify::Event),
}
impl From<rendezvous::server::Event> for MyEvent {
fn from(event: rendezvous::server::Event) -> Self {
MyEvent::Rendezvous(event)
}
}
impl From<ping::Event> for MyEvent {
fn from(event: ping::Event) -> Self {
MyEvent::Ping(event)
}
}
impl From<identify::Event> for MyEvent {
fn from(event: identify::Event) -> Self {
MyEvent::Identify(event)
}
}
impl From<Void> for MyEvent {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
#[behaviour(
out_event = "MyEvent",
event_process = false,
prelude = "libp2p_swarm::derive_prelude"
)]
struct MyBehaviour { struct MyBehaviour {
identify: identify::Behaviour, identify: identify::Behaviour,
rendezvous: rendezvous::server::Behaviour, rendezvous: rendezvous::server::Behaviour,