examples/*: Migrate to async await (#2356)

* Adapt examples to async style loop
* Adapt async style loop for chat.rs
* Adapt async style loop for distributed-key-value-store.rs
* Adapt async style loop for gossibsub-chat.rs
* Adapt async style loop for ipfs-private.rs
* Adapt ping to use async
* Update tutorial crate to reflect new changes

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Gerardo Enrique Arriaga Rendon 2021-12-06 11:32:58 -05:00 committed by GitHub
parent 75ae7b0461
commit 2c75fbe812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 184 deletions

View File

@ -50,7 +50,10 @@
//! The two nodes then connect.
use async_std::{io, task};
use futures::{future, prelude::*};
use futures::{
prelude::{stream::StreamExt, *},
select,
};
use libp2p::{
floodsub::{self, Floodsub, FloodsubEvent},
identity,
@ -58,10 +61,7 @@ use libp2p::{
swarm::SwarmEvent,
Multiaddr, NetworkBehaviour, PeerId, Swarm,
};
use std::{
error::Error,
task::{Context, Poll},
};
use std::error::Error;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
@ -133,40 +133,34 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm
select! {
line = stdin.select_next_some() => swarm
.behaviour_mut()
.floodsub
.publish(floodsub_topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => {
.publish(floodsub_topic.clone(), line.expect("Stdin not to close").as_bytes()),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {:?}", address);
}
Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Floodsub(
FloodsubEvent::Message(message),
)))) => {
SwarmEvent::Behaviour(OutEvent::Floodsub(
FloodsubEvent::Message(message)
)) => {
println!(
"Received: '{:?}' from {:?}",
String::from_utf8_lossy(&message.data),
message.source
);
}
Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns(
MdnsEvent::Discovered(list),
)))) => {
SwarmEvent::Behaviour(OutEvent::Mdns(
MdnsEvent::Discovered(list)
)) => {
for (peer, _) in list {
swarm
.behaviour_mut()
@ -174,9 +168,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.add_node_to_partial_view(peer);
}
}
Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired(
list,
))))) => {
SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired(
list
))) => {
for (peer, _) in list {
if !swarm.behaviour_mut().mdns.has_node(&peer) {
swarm
@ -185,12 +179,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.remove_node_from_partial_view(&peer);
}
}
}
Poll::Ready(Some(_)) => {}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
},
_ => {}
}
}
}
Poll::Pending
}))
}

View File

@ -41,7 +41,7 @@
//! 4. Close with Ctrl-c.
use async_std::{io, task};
use futures::prelude::*;
use futures::{prelude::*, select};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{
record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,
@ -53,10 +53,7 @@ use libp2p::{
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
NetworkBehaviour, PeerId, Swarm,
};
use std::{
error::Error,
task::{Context, Poll},
};
use std::error::Error;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
@ -157,35 +154,23 @@ async fn main() -> Result<(), Box<dyn Error>> {
};
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off.
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => {
handle_input_line(&mut swarm.behaviour_mut().kademlia, line)
}
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
select! {
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening in {:?}", address);
},
_ => {}
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
}
}
Poll::Pending
}))
}
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {

View File

@ -46,21 +46,18 @@
//!
//! The two nodes should then connect.
use async_std::{io, task};
use async_std::io;
use env_logger::{Builder, Env};
use futures::prelude::*;
use futures::{prelude::*, select};
use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{
GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode,
};
use libp2p::{gossipsub, identity, swarm::SwarmEvent, Multiaddr, PeerId};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::{
error::Error,
task::{Context, Poll},
};
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
@ -130,25 +127,20 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
// Kick it off
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm
select! {
line = stdin.select_next_some() => {
if let Err(e) = swarm
.behaviour_mut()
.publish(topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
} {
.publish(topic.clone(), line.expect("Stdin not to close").as_bytes())
{
println!("Publish error: {:?}", e);
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
},
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
@ -163,11 +155,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Listening on {:?}", address);
}
_ => {}
},
Poll::Ready(None) | Poll::Pending => break,
}
}
Poll::Pending
}))
}
}

View File

@ -31,8 +31,8 @@
//!
//! You can ping this node, or use pubsub (gossipsub) on the topic "chat". For this
//! to work, the ipfs node needs to be configured to use gossipsub.
use async_std::{io, task};
use futures::{future, prelude::*};
use async_std::io;
use futures::{prelude::*, select};
use libp2p::{
core::{
either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version,
@ -48,15 +48,7 @@ use libp2p::{
yamux::YamuxConfig,
Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport,
};
use std::{
env,
error::Error,
fs,
path::Path,
str::FromStr,
task::{Context, Poll},
time::Duration,
};
use std::{env, error::Error, fs, path::Path, str::FromStr, time::Duration};
/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
@ -138,7 +130,8 @@ fn parse_legacy_multiaddr(text: &str) -> Result<Multiaddr, Box<dyn Error>> {
Ok(res)
}
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let ipfs_path: Box<Path> = get_ipfs_path();
@ -270,36 +263,28 @@ fn main() -> Result<(), Box<dyn Error>> {
}
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm
select! {
line = stdin.select_next_some() => {
if let Err(e) = swarm
.behaviour_mut()
.gossipsub
.publish(gossipsub_topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
} {
.publish(gossipsub_topic.clone(), line.expect("Stdin not to close").as_bytes())
{
println!("Publish error: {:?}", e);
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
},
event = swarm.select_next_some() => {
if let SwarmEvent::NewListenAddr { address, .. } = event {
println!("Listening on {:?}", address);
}
}
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
}
}
Poll::Pending
}))
}

View File

@ -40,19 +40,18 @@
//! The two nodes establish a connection, negotiate the ping protocol
//! and begin pinging each other.
use futures::executor::block_on;
use futures::prelude::*;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::{identity, ping, Multiaddr, PeerId};
use std::error::Error;
use std::task::Poll;
fn main() -> Result<(), Box<dyn Error>> {
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);
let transport = block_on(libp2p::development_transport(local_key))?;
let transport = libp2p::development_transport(local_key).await?;
// Create a ping network behaviour.
//
@ -75,17 +74,11 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Dialed {}", addr)
}
block_on(future::poll_fn(move |cx| loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => match event {
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address),
SwarmEvent::Behaviour(event) => println!("{:?}", event),
_ => {}
},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}));
Ok(())
}
}

View File

@ -39,7 +39,8 @@
//! 1. Creating a new crate: `cargo init rust-libp2p-tutorial`
//!
//! 2. Adding `libp2p` as well as `futures` as a dependency in the
//! `Cargo.toml` file:
//! `Cargo.toml` file. We will also include `async-std` with the
//! "attributes" feature to allow for an `async main`:
//!
//! ```yaml
//! [package]
@ -52,13 +53,15 @@
//! [dependencies]
//! libp2p = "<insert-current-version-here>"
//! futures = "<insert-current-version-here>"
//! async-std = { version = "<insert-current-version-here>", features = ["attributes"] }
//! ```
//!
//! ## Network identity
//!
//! With all the scaffolding in place, we can dive into the libp2p specifics. At
//! first we need to create a network identity for our local node in `fn
//! main()`. Identities in libp2p are handled via a public and private key pair.
//! first we need to create a network identity for our local node in `async fn
//! main()`, annotated with an attribute to allow `main` to be `async`.
//! Identities in libp2p are handled via a public and private key pair.
//! Nodes identify each other via their [`PeerId`](crate::PeerId) which is
//! derived from the public key.
//!
@ -66,7 +69,8 @@
//! use libp2p::{identity, PeerId};
//! use std::error::Error;
//!
//! fn main() -> Result<(), Box<dyn Error>> {
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let local_key = identity::Keypair::generate_ed25519();
//! let local_peer_id = PeerId::from(local_key.public());
//! println!("Local peer id: {:?}", local_peer_id);
@ -98,16 +102,16 @@
//! [`crate::core::muxing`] and [`yamux`](crate::yamux).
//!
//! ```rust
//! use futures::executor::block_on;
//! use libp2p::{identity, PeerId};
//! use std::error::Error;
//!
//! fn main() -> Result<(), Box<dyn Error>> {
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let local_key = identity::Keypair::generate_ed25519();
//! let local_peer_id = PeerId::from(local_key.public());
//! println!("Local peer id: {:?}", local_peer_id);
//!
//! let transport = block_on(libp2p::development_transport(local_key))?;
//! let transport = libp2p::development_transport(local_key).await?;
//!
//! Ok(())
//! }
@ -138,17 +142,17 @@
//! [`Ping`](crate::ping::Ping) [`NetworkBehaviour`] at the end:
//!
//! ```rust
//! use futures::executor::block_on;
//! use libp2p::{identity, PeerId};
//! use libp2p::ping::{Ping, PingConfig};
//! use std::error::Error;
//!
//! fn main() -> Result<(), Box<dyn Error>> {
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let local_key = identity::Keypair::generate_ed25519();
//! let local_peer_id = PeerId::from(local_key.public());
//! println!("Local peer id: {:?}", local_peer_id);
//!
//! let transport = block_on(libp2p::development_transport(local_key))?;
//! let transport = libp2p::development_transport(local_key).await?;
//!
//! // Create a ping network behaviour.
//! //
@ -171,18 +175,18 @@
//! [`Transport`] to the [`NetworkBehaviour`].
//!
//! ```rust
//! use futures::executor::block_on;
//! use libp2p::{identity, PeerId};
//! use libp2p::ping::{Ping, PingConfig};
//! use libp2p::swarm::Swarm;
//! use std::error::Error;
//!
//! fn main() -> Result<(), Box<dyn Error>> {
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let local_key = identity::Keypair::generate_ed25519();
//! let local_peer_id = PeerId::from(local_key.public());
//! println!("Local peer id: {:?}", local_peer_id);
//!
//! let transport = block_on(libp2p::development_transport(local_key))?;
//! let transport = libp2p::development_transport(local_key).await?;
//!
//! // Create a ping network behaviour.
//! //
@ -222,18 +226,18 @@
//! remote peer.
//!
//! ```rust
//! use futures::executor::block_on;
//! use libp2p::{identity, Multiaddr, PeerId};
//! use libp2p::ping::{Ping, PingConfig};
//! use libp2p::swarm::{Swarm, dial_opts::DialOpts};
//! use std::error::Error;
//!
//! fn main() -> Result<(), Box<dyn Error>> {
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let local_key = identity::Keypair::generate_ed25519();
//! let local_peer_id = PeerId::from(local_key.public());
//! println!("Local peer id: {:?}", local_peer_id);
//!
//! let transport = block_on(libp2p::development_transport(local_key))?;
//! let transport = libp2p::development_transport(local_key).await?;
//!
//! // Create a ping network behaviour.
//! //
@ -267,20 +271,19 @@
//! outgoing connection in case we specify an address on the CLI.
//!
//! ```no_run
//! use futures::executor::block_on;
//! use futures::prelude::*;
//! use libp2p::ping::{Ping, PingConfig};
//! use libp2p::swarm::{Swarm, SwarmEvent, dial_opts::DialOpts};
//! use libp2p::{identity, Multiaddr, PeerId};
//! use std::error::Error;
//! use std::task::Poll;
//!
//! fn main() -> Result<(), Box<dyn Error>> {
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//! let local_key = identity::Keypair::generate_ed25519();
//! let local_peer_id = PeerId::from(local_key.public());
//! println!("Local peer id: {:?}", local_peer_id);
//!
//! let transport = block_on(libp2p::development_transport(local_key))?;
//! let transport = libp2p::development_transport(local_key).await?;
//!
//! // Create a ping network behaviour.
//! //
@ -303,19 +306,14 @@
//! println!("Dialed {}", addr)
//! }
//!
//! block_on(future::poll_fn(move |cx| loop {
//! match swarm.poll_next_unpin(cx) {
//! Poll::Ready(Some(event)) => match event {
//! loop {
//! match swarm.select_next_some().await {
//! SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address),
//! SwarmEvent::Behaviour(event) => println!("{:?}", event),
//! _ => {}
//! },
//! Poll::Ready(None) => return Poll::Ready(()),
//! Poll::Pending => return Poll::Pending
//! }
//! }));
//! }
//!
//! Ok(())
//! }
//! ```
//!