diff --git a/examples/chat.rs b/examples/chat.rs index bdafde20..907b43b1 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -50,7 +50,10 @@ //! The two nodes then connect. use async_std::{io, task}; -use futures::{future, prelude::*}; +use futures::{ + prelude::{stream::StreamExt, *}, + select, +}; use libp2p::{ floodsub::{self, Floodsub, FloodsubEvent}, identity, @@ -58,10 +61,7 @@ use libp2p::{ swarm::SwarmEvent, Multiaddr, NetworkBehaviour, PeerId, Swarm, }; -use std::{ - error::Error, - task::{Context, Poll}, -}; +use std::error::Error; #[async_std::main] async fn main() -> Result<(), Box> { @@ -133,40 +133,34 @@ async fn main() -> Result<(), Box> { } // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Listen on all interfaces and whatever port the OS assigns swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm - .behaviour_mut() - .floodsub - .publish(floodsub_topic.clone(), line.as_bytes()), - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } - } - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => { + loop { + select! { + line = stdin.select_next_some() => swarm + .behaviour_mut() + .floodsub + .publish(floodsub_topic.clone(), line.expect("Stdin not to close").as_bytes()), + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { println!("Listening on {:?}", address); } - Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Floodsub( - FloodsubEvent::Message(message), - )))) => { + SwarmEvent::Behaviour(OutEvent::Floodsub( + FloodsubEvent::Message(message) + )) => { println!( "Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source ); } - Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns( - MdnsEvent::Discovered(list), - )))) => { + SwarmEvent::Behaviour(OutEvent::Mdns( + MdnsEvent::Discovered(list) + )) => { for (peer, _) in list { swarm .behaviour_mut() @@ -174,9 +168,9 @@ async fn main() -> Result<(), Box> { .add_node_to_partial_view(peer); } } - Poll::Ready(Some(SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired( - list, - ))))) => { + SwarmEvent::Behaviour(OutEvent::Mdns(MdnsEvent::Expired( + list + ))) => { for (peer, _) in list { if !swarm.behaviour_mut().mdns.has_node(&peer) { swarm @@ -185,12 +179,9 @@ async fn main() -> Result<(), Box> { .remove_node_from_partial_view(&peer); } } - } - Poll::Ready(Some(_)) => {} - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, + }, + _ => {} } } - Poll::Pending - })) + } } diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index cea3ca2a..3f92ebbe 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -41,7 +41,7 @@ //! 4. Close with Ctrl-c. use async_std::{io, task}; -use futures::prelude::*; +use futures::{prelude::*, select}; use libp2p::kad::record::store::MemoryStore; use libp2p::kad::{ record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult, @@ -53,10 +53,7 @@ use libp2p::{ swarm::{NetworkBehaviourEventProcess, SwarmEvent}, NetworkBehaviour, PeerId, Swarm, }; -use std::{ - error::Error, - task::{Context, Poll}, -}; +use std::error::Error; #[async_std::main] async fn main() -> Result<(), Box> { @@ -157,35 +154,23 @@ async fn main() -> Result<(), Box> { }; // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Listen on all interfaces and whatever port the OS assigns. swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off. - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => { - handle_input_line(&mut swarm.behaviour_mut().kademlia, line) - } - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, + loop { + select! { + line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")), + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening in {:?}", address); + }, + _ => {} } } - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - if let SwarmEvent::NewListenAddr { address, .. } = event { - println!("Listening on {:?}", address); - } - } - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, - } - } - Poll::Pending - })) + } } fn handle_input_line(kademlia: &mut Kademlia, line: String) { diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index fdd477bf..976fcafb 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -46,21 +46,18 @@ //! //! The two nodes should then connect. -use async_std::{io, task}; +use async_std::io; use env_logger::{Builder, Env}; -use futures::prelude::*; +use futures::{prelude::*, select}; use libp2p::gossipsub::MessageId; use libp2p::gossipsub::{ GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode, }; use libp2p::{gossipsub, identity, swarm::SwarmEvent, Multiaddr, PeerId}; use std::collections::hash_map::DefaultHasher; +use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::Duration; -use std::{ - error::Error, - task::{Context, Poll}, -}; #[async_std::main] async fn main() -> Result<(), Box> { @@ -130,44 +127,35 @@ async fn main() -> Result<(), Box> { } // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Kick it off - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - if let Err(e) = match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm + loop { + select! { + line = stdin.select_next_some() => { + if let Err(e) = swarm .behaviour_mut() - .publish(topic.clone(), line.as_bytes()), - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } { - println!("Publish error: {:?}", e); + .publish(topic.clone(), line.expect("Stdin not to close").as_bytes()) + { + println!("Publish error: {:?}", e); + } + }, + event = swarm.select_next_some() => match event { + SwarmEvent::Behaviour(GossipsubEvent::Message { + propagation_source: peer_id, + message_id: id, + message, + }) => println!( + "Got message: {} with id: {} from peer: {:?}", + String::from_utf8_lossy(&message.data), + id, + peer_id + ), + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening on {:?}", address); + } + _ => {} } } - - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - SwarmEvent::Behaviour(GossipsubEvent::Message { - propagation_source: peer_id, - message_id: id, - message, - }) => println!( - "Got message: {} with id: {} from peer: {:?}", - String::from_utf8_lossy(&message.data), - id, - peer_id - ), - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening on {:?}", address); - } - _ => {} - }, - Poll::Ready(None) | Poll::Pending => break, - } - } - - Poll::Pending - })) + } } diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 4b44ad3f..fdeed494 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -31,8 +31,8 @@ //! //! You can ping this node, or use pubsub (gossipsub) on the topic "chat". For this //! to work, the ipfs node needs to be configured to use gossipsub. -use async_std::{io, task}; -use futures::{future, prelude::*}; +use async_std::io; +use futures::{prelude::*, select}; use libp2p::{ core::{ either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version, @@ -48,15 +48,7 @@ use libp2p::{ yamux::YamuxConfig, Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, }; -use std::{ - env, - error::Error, - fs, - path::Path, - str::FromStr, - task::{Context, Poll}, - time::Duration, -}; +use std::{env, error::Error, fs, path::Path, str::FromStr, time::Duration}; /// Builds the transport that serves as a common ground for all connections. pub fn build_transport( @@ -138,7 +130,8 @@ fn parse_legacy_multiaddr(text: &str) -> Result> { Ok(res) } -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { env_logger::init(); let ipfs_path: Box = get_ipfs_path(); @@ -270,36 +263,28 @@ fn main() -> Result<(), Box> { } // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); + let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); // Listen on all interfaces and whatever port the OS assigns swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off - task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { - loop { - if let Err(e) = match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm + loop { + select! { + line = stdin.select_next_some() => { + if let Err(e) = swarm .behaviour_mut() .gossipsub - .publish(gossipsub_topic.clone(), line.as_bytes()), - Poll::Ready(None) => panic!("Stdin closed"), - Poll::Pending => break, - } { - println!("Publish error: {:?}", e); - } - } - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => { - if let SwarmEvent::NewListenAddr { address, .. } = event { - println!("Listening on {:?}", address); - } + .publish(gossipsub_topic.clone(), line.expect("Stdin not to close").as_bytes()) + { + println!("Publish error: {:?}", e); + } + }, + event = swarm.select_next_some() => { + if let SwarmEvent::NewListenAddr { address, .. } = event { + println!("Listening on {:?}", address); } - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, } } - Poll::Pending - })) + } } diff --git a/examples/ping.rs b/examples/ping.rs index 6a4523b4..26223459 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -40,19 +40,18 @@ //! The two nodes establish a connection, negotiate the ping protocol //! and begin pinging each other. -use futures::executor::block_on; use futures::prelude::*; use libp2p::swarm::{Swarm, SwarmEvent}; use libp2p::{identity, ping, Multiaddr, PeerId}; use std::error::Error; -use std::task::Poll; -fn main() -> Result<(), Box> { +#[async_std::main] +async fn main() -> Result<(), Box> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {:?}", local_peer_id); - let transport = block_on(libp2p::development_transport(local_key))?; + let transport = libp2p::development_transport(local_key).await?; // Create a ping network behaviour. // @@ -75,17 +74,11 @@ fn main() -> Result<(), Box> { println!("Dialed {}", addr) } - block_on(future::poll_fn(move |cx| loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), - SwarmEvent::Behaviour(event) => println!("{:?}", event), - _ => {} - }, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), + SwarmEvent::Behaviour(event) => println!("{:?}", event), + _ => {} } - })); - - Ok(()) + } } diff --git a/src/tutorial.rs b/src/tutorial.rs index 2f76c5c2..bbc7f50d 100644 --- a/src/tutorial.rs +++ b/src/tutorial.rs @@ -39,7 +39,8 @@ //! 1. Creating a new crate: `cargo init rust-libp2p-tutorial` //! //! 2. Adding `libp2p` as well as `futures` as a dependency in the -//! `Cargo.toml` file: +//! `Cargo.toml` file. We will also include `async-std` with the +//! "attributes" feature to allow for an `async main`: //! //! ```yaml //! [package] @@ -52,13 +53,15 @@ //! [dependencies] //! libp2p = "" //! futures = "" +//! async-std = { version = "", features = ["attributes"] } //! ``` //! //! ## Network identity //! //! With all the scaffolding in place, we can dive into the libp2p specifics. At -//! first we need to create a network identity for our local node in `fn -//! main()`. Identities in libp2p are handled via a public and private key pair. +//! first we need to create a network identity for our local node in `async fn +//! main()`, annotated with an attribute to allow `main` to be `async`. +//! Identities in libp2p are handled via a public and private key pair. //! Nodes identify each other via their [`PeerId`](crate::PeerId) which is //! derived from the public key. //! @@ -66,7 +69,8 @@ //! use libp2p::{identity, PeerId}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); @@ -98,16 +102,16 @@ //! [`crate::core::muxing`] and [`yamux`](crate::yamux). //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, PeerId}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! Ok(()) //! } @@ -138,17 +142,17 @@ //! [`Ping`](crate::ping::Ping) [`NetworkBehaviour`] at the end: //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, PeerId}; //! use libp2p::ping::{Ping, PingConfig}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -171,18 +175,18 @@ //! [`Transport`] to the [`NetworkBehaviour`]. //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, PeerId}; //! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::swarm::Swarm; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -222,18 +226,18 @@ //! remote peer. //! //! ```rust -//! use futures::executor::block_on; //! use libp2p::{identity, Multiaddr, PeerId}; //! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::swarm::{Swarm, dial_opts::DialOpts}; //! use std::error::Error; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -267,20 +271,19 @@ //! outgoing connection in case we specify an address on the CLI. //! //! ```no_run -//! use futures::executor::block_on; //! use futures::prelude::*; //! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::swarm::{Swarm, SwarmEvent, dial_opts::DialOpts}; //! use libp2p::{identity, Multiaddr, PeerId}; //! use std::error::Error; -//! use std::task::Poll; //! -//! fn main() -> Result<(), Box> { +//! #[async_std::main] +//! async fn main() -> Result<(), Box> { //! let local_key = identity::Keypair::generate_ed25519(); //! let local_peer_id = PeerId::from(local_key.public()); //! println!("Local peer id: {:?}", local_peer_id); //! -//! let transport = block_on(libp2p::development_transport(local_key))?; +//! let transport = libp2p::development_transport(local_key).await?; //! //! // Create a ping network behaviour. //! // @@ -303,19 +306,14 @@ //! println!("Dialed {}", addr) //! } //! -//! block_on(future::poll_fn(move |cx| loop { -//! match swarm.poll_next_unpin(cx) { -//! Poll::Ready(Some(event)) => match event { -//! SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), -//! SwarmEvent::Behaviour(event) => println!("{:?}", event), -//! _ => {} -//! }, -//! Poll::Ready(None) => return Poll::Ready(()), -//! Poll::Pending => return Poll::Pending +//! loop { +//! match swarm.select_next_some().await { +//! SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {:?}", address), +//! SwarmEvent::Behaviour(event) => println!("{:?}", event), +//! _ => {} //! } -//! })); +//! } //! -//! Ok(()) //! } //! ``` //!