diff --git a/Cargo.toml b/Cargo.toml index a9722817..e8c44fe1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,9 +45,8 @@ libp2p-tcp = { version = "0.13.0", path = "transports/tcp" } libp2p-websocket = { version = "0.13.0", path = "transports/websocket", optional = true } [dev-dependencies] +async-std = "1.0" env_logger = "0.7.1" -tokio = "0.1" -tokio-stdin-stdout = "0.1" [workspace] members = [ diff --git a/examples/chat.rs b/examples/chat.rs index 183973ae..4ff6af1a 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -49,20 +49,21 @@ //! //! The two nodes then connect. -use futures::prelude::*; +use async_std::{io, task}; +use futures::{future, prelude::*}; use libp2p::{ + Multiaddr, PeerId, Swarm, NetworkBehaviour, identity, - tokio_codec::{FramedRead, LinesCodec}, - tokio_io::{AsyncRead, AsyncWrite}, floodsub::{self, Floodsub, FloodsubEvent}, mdns::{Mdns, MdnsEvent}, swarm::NetworkBehaviourEventProcess }; +use std::{error::Error, task::{Context, Poll}}; -fn main() { +fn main() -> Result<(), Box> { env_logger::init(); // Create a random PeerId @@ -71,7 +72,7 @@ fn main() { println!("Local peer id: {:?}", local_peer_id); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols - let transport = libp2p::build_development_transport(local_key); + let transport = libp2p::build_development_transport(local_key)?; // Create a Floodsub topic let floodsub_topic = floodsub::TopicBuilder::new("chat").build(); @@ -87,18 +88,16 @@ fn main() { impl NetworkBehaviourEventProcess for MyBehaviour { fn inject_event(&mut self, event: MdnsEvent) { match event { - MdnsEvent::Discovered(list) => { + MdnsEvent::Discovered(list) => for (peer, _) in list { self.floodsub.add_node_to_partial_view(peer); } - }, - MdnsEvent::Expired(list) => { + MdnsEvent::Expired(list) => for (peer, _) in list { if !self.mdns.has_node(&peer) { self.floodsub.remove_node_from_partial_view(&peer); } } - } } } } @@ -114,9 +113,10 @@ fn main() { // Create a Swarm to manage peers and events let mut swarm = { + let mdns = task::block_on(Mdns::new())?; let mut behaviour = MyBehaviour { floodsub: Floodsub::new(local_peer_id.clone()), - mdns: Mdns::new().expect("Failed to create mDNS service"), + mdns }; behaviour.floodsub.subscribe(floodsub_topic.clone()); @@ -125,42 +125,32 @@ fn main() { // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { - let dialing = to_dial.clone(); - match to_dial.parse() { - Ok(to_dial) => { - match libp2p::Swarm::dial_addr(&mut swarm, to_dial) { - Ok(_) => println!("Dialed {:?}", dialing), - Err(e) => println!("Dial {:?} failed: {:?}", dialing, e) - } - }, - Err(err) => println!("Failed to parse address to dial: {:?}", err), - } + let addr: Multiaddr = to_dial.parse()?; + Swarm::dial_addr(&mut swarm, addr)?; + println!("Dialed {:?}", to_dial) } // Read full lines from stdin - let stdin = tokio_stdin_stdout::stdin(0); - let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); + let mut stdin = io::BufReader::new(io::stdin()).lines(); // Listen on all interfaces and whatever port the OS assigns - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off let mut listening = false; - tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + task::block_on(future::poll_fn(move |cx: &mut Context| { loop { - match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), - Async::Ready(None) => panic!("Stdin closed"), - Async::NotReady => break, - }; + match stdin.try_poll_next_unpin(cx)? { + Poll::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), + Poll::Ready(None) => panic!("Stdin closed"), + Poll::Pending => break + } } - loop { - match swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(_)) => { - - }, - Async::Ready(None) | Async::NotReady => { + match swarm.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => println!("{:?}", event), + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => { if !listening { if let Some(a) = Swarm::listeners(&swarm).next() { println!("Listening on {:?}", a); @@ -171,7 +161,6 @@ fn main() { } } } - - Ok(Async::NotReady) - })); + Poll::Pending + })) } diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store.rs index d8f649d8..84c16c15 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store.rs @@ -29,19 +29,22 @@ //! //! 4. Close with Ctrl-c. +use async_std::{io, task}; use futures::prelude::*; use libp2p::kad::record::store::MemoryStore; use libp2p::kad::{record::Key, Kademlia, KademliaEvent, PutRecordOk, Quorum, Record}; use libp2p::{ - build_development_transport, identity, + NetworkBehaviour, + PeerId, + Swarm, + build_development_transport, + identity, mdns::{Mdns, MdnsEvent}, - swarm::NetworkBehaviourEventProcess, - tokio_codec::{FramedRead, LinesCodec}, - tokio_io::{AsyncRead, AsyncWrite}, - NetworkBehaviour, PeerId, Swarm, + swarm::NetworkBehaviourEventProcess }; +use std::{error::Error, task::{Context, Poll}}; -fn main() { +fn main() -> Result<(), Box> { env_logger::init(); // Create a random key for ourselves. @@ -49,17 +52,18 @@ fn main() { let local_peer_id = PeerId::from(local_key.public()); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol. - let transport = build_development_transport(local_key); + let transport = build_development_transport(local_key)?; // We create a custom network behaviour that combines Kademlia and mDNS. #[derive(NetworkBehaviour)] struct MyBehaviour { kademlia: Kademlia, - mdns: Mdns, + mdns: Mdns } - impl NetworkBehaviourEventProcess - for MyBehaviour + impl NetworkBehaviourEventProcess for MyBehaviour + where + T: AsyncRead + AsyncWrite { // Called when `mdns` produces an event. fn inject_event(&mut self, event: MdnsEvent) { @@ -71,8 +75,9 @@ fn main() { } } - impl NetworkBehaviourEventProcess - for MyBehaviour + impl NetworkBehaviourEventProcess for MyBehaviour + where + T: AsyncRead + AsyncWrite { // Called when `kademlia` produces an event. fn inject_event(&mut self, message: KademliaEvent) { @@ -108,58 +113,50 @@ fn main() { // Create a Kademlia behaviour. let store = MemoryStore::new(local_peer_id.clone()); let kademlia = Kademlia::new(local_peer_id.clone(), store); - - let behaviour = MyBehaviour { - kademlia, - mdns: Mdns::new().expect("Failed to create mDNS service"), - }; - + let mdns = task::block_on(Mdns::new())?; + let behaviour = MyBehaviour { kademlia, mdns }; Swarm::new(transport, behaviour, local_peer_id) }; - // Read full lines from stdin. - let stdin = tokio_stdin_stdout::stdin(0); - let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); + // Read full lines from stdin + let mut stdin = io::BufReader::new(io::stdin()).lines(); // Listen on all interfaces and whatever port the OS assigns. - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; // Kick it off. let mut listening = false; - tokio::run(futures::future::poll_fn(move || { + task::block_on(future::poll_fn(move |cx: &mut Context| { loop { - match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => { - handle_input_line(&mut swarm.kademlia, line); - } - Async::Ready(None) => panic!("Stdin closed"), - Async::NotReady => break, - }; + match stdin.try_poll_next_unpin(cx)? { + Poll::Ready(Some(line)) => handle_input_line(&mut swarm.kademlia, line), + Poll::Ready(None) => panic!("Stdin closed"), + Poll::Pending => break + } } - loop { - match swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(_)) => {} - Async::Ready(None) | Async::NotReady => { + match swarm.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => println!("{:?}", event), + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => { if !listening { if let Some(a) = Swarm::listeners(&swarm).next() { println!("Listening on {:?}", a); listening = true; } } - break; + break } } } - - Ok(Async::NotReady) - })); + Poll::Pending + })) } -fn handle_input_line( - kademlia: &mut Kademlia, - line: String, -) { +fn handle_input_line(kademlia: &mut Kademlia, line: String) +where + T: AsyncRead + AsyncWrite +{ let mut args = line.split(" "); match args.next() { diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 7ee1f88e..326e6f57 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -23,6 +23,7 @@ //! You can pass as parameter a base58 peer ID to search for. If you don't pass any parameter, a //! peer ID will be generated randomly. +use async_std::task; use futures::prelude::*; use libp2p::{ Swarm, @@ -32,10 +33,9 @@ use libp2p::{ }; use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError}; use libp2p::kad::record::store::MemoryStore; -use std::env; -use std::time::Duration; +use std::{env, error::Error, time::Duration}; -fn main() { +fn main() -> Result<(), Box> { env_logger::init(); // Create a random key for ourselves. @@ -43,7 +43,7 @@ fn main() { let local_peer_id = PeerId::from(local_key.public()); // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol - let transport = build_development_transport(local_key); + let transport = build_development_transport(local_key)?; // Create a swarm to manage peers and events. let mut swarm = { @@ -60,7 +60,7 @@ fn main() { behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/ // The only address that currently works. - behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); + behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?); // The following addresses always fail signature verification, possibly due to // RSA keys with < 2048 bits. @@ -80,7 +80,7 @@ fn main() { // Order Kademlia to search for a peer. let to_search: PeerId = if let Some(peer_id) = env::args().nth(1) { - peer_id.parse().expect("Failed to parse peer ID to find") + peer_id.parse()? } else { identity::Keypair::generate_ed25519().public().into() }; @@ -89,38 +89,29 @@ fn main() { swarm.get_closest_peers(to_search); // Kick it off! - tokio::run(futures::future::poll_fn(move || { - loop { - match swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(KademliaEvent::GetClosestPeersResult(res))) => { - match res { - Ok(ok) => { - if !ok.peers.is_empty() { - println!("Query finished with closest peers: {:#?}", ok.peers); - return Ok(Async::Ready(())); - } else { - // The example is considered failed as there - // should always be at least 1 reachable peer. - panic!("Query finished with no closest peers."); - } + task::block_on(async move { + while let Some(event) = swarm.try_next().await? { + if let KademliaEvent::GetClosestPeersResult(result) = event { + match result { + Ok(ok) => + if !ok.peers.is_empty() { + println!("Query finished with closest peers: {:#?}", ok.peers) + } else { + // The example is considered failed as there + // should always be at least 1 reachable peer. + panic!("Query finished with no closest peers.") } - Err(GetClosestPeersError::Timeout { peers, .. }) => { - if !peers.is_empty() { - println!("Query timed out with closest peers: {:#?}", peers); - return Ok(Async::Ready(())); - } else { - // The example is considered failed as there - // should always be at least 1 reachable peer. - panic!("Query timed out with no closest peers."); - } + Err(GetClosestPeersError::Timeout { peers, .. }) => + if !peers.is_empty() { + println!("Query timed out with closest peers: {:#?}", peers) + } else { + // The example is considered failed as there + // should always be at least 1 reachable peer. + panic!("Query timed out with no closest peers."); } - } - }, - Async::Ready(Some(_)) => {}, - Async::Ready(None) | Async::NotReady => break, + } } } - - Ok(Async::NotReady) - })); + Ok(()) + }) } diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs index 32c760e9..a8f4323a 100644 --- a/examples/mdns-passive-discovery.rs +++ b/examples/mdns-passive-discovery.rs @@ -18,26 +18,17 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::prelude::*; +use async_std::task; use libp2p::mdns::service::{MdnsPacket, MdnsService}; -use std::io; +use std::error::Error; -fn main() { - // This example provides passive discovery of the libp2p nodes on the network that send - // mDNS queries and answers. - - // We start by creating the service. - let mut service = MdnsService::new().expect("Error while creating mDNS service"); - - // Create a never-ending `Future` that polls the service for events. - let future = futures::future::poll_fn(move || -> Poll<(), io::Error> { +fn main() -> Result<(), Box> { + // This example provides passive discovery of the libp2p nodes on the + // network that send mDNS queries and answers. + task::block_on(async move { + let mut service = MdnsService::new().await?; loop { - // Grab the next available packet from the service. - let packet = match service.poll() { - Async::Ready(packet) => packet, - Async::NotReady => return Ok(Async::NotReady), - }; - + let (srv, packet) = service.next().await; match packet { MdnsPacket::Query(query) => { // We detected a libp2p mDNS query on the network. In a real application, you @@ -63,9 +54,7 @@ fn main() { println!("Detected service query from {:?}", query.remote_addr()); } } + service = srv } - }); - - // Blocks the thread until the future runs to completion (which will never happen). - tokio::run(future.map_err(|err| panic!("{:?}", err))); + }) } diff --git a/examples/ping.rs b/examples/ping.rs index a8a6981b..aa9e1f8d 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -38,11 +38,12 @@ //! The two nodes establish a connection, negotiate the ping protocol //! and begin pinging each other. -use futures::{prelude::*, future}; -use libp2p::{ identity, PeerId, ping::{Ping, PingConfig}, Swarm }; -use std::env; +use async_std::task; +use futures::{future, prelude::*}; +use libp2p::{identity, PeerId, ping::{Ping, PingConfig}, Swarm}; +use std::{error::Error, task::{Context, Poll}}; -fn main() { +fn main() -> Result<(), Box> { env_logger::init(); // Create a random PeerId. @@ -51,7 +52,7 @@ fn main() { println!("Local peer id: {:?}", peer_id); // Create a transport. - let transport = libp2p::build_development_transport(id_keys); + let transport = libp2p::build_development_transport(id_keys)?; // Create a ping network behaviour. // @@ -66,38 +67,33 @@ fn main() { // Dial the peer identified by the multi-address given as the second // command-line argument, if any. - if let Some(addr) = env::args().nth(1) { - let remote_addr = addr.clone(); - match addr.parse() { - Ok(remote) => { - match Swarm::dial_addr(&mut swarm, remote) { - Ok(()) => println!("Dialed {:?}", remote_addr), - Err(e) => println!("Dialing {:?} failed with: {:?}", remote_addr, e) - } - }, - Err(err) => println!("Failed to parse address to dial: {:?}", err), - } + if let Some(addr) = std::env::args().nth(1) { + let remote = addr.parse()?; + Swarm::dial_addr(&mut swarm, remote)?; + println!("Dialed {}", addr) } // Tell the swarm to listen on all interfaces and a random, OS-assigned port. - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; - // Use tokio to drive the `Swarm`. let mut listening = false; - tokio::run(future::poll_fn(move || -> Result<_, ()> { + task::block_on(future::poll_fn(move |cx: &mut Context| { loop { - match swarm.poll().expect("Error while polling swarm") { - Async::Ready(Some(e)) => println!("{:?}", e), - Async::Ready(None) | Async::NotReady => { + match swarm.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => println!("{:?}", event), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => { if !listening { - if let Some(a) = Swarm::listeners(&swarm).next() { - println!("Listening on {:?}", a); + for addr in Swarm::listeners(&swarm) { + println!("Listening on {}", addr); listening = true; } } - return Ok(Async::NotReady) + return Poll::Pending } } } })); + + Ok(()) } diff --git a/misc/core-derive/Cargo.toml b/misc/core-derive/Cargo.toml index da21dab1..9c45a821 100644 --- a/misc/core-derive/Cargo.toml +++ b/misc/core-derive/Cargo.toml @@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"] proc-macro = true [dependencies] -syn = { version = "0.15.22", default-features = false, features = ["clone-impls", "derive", "parsing", "printing", "proc-macro"] } -quote = "0.6" +syn = { version = "1.0.8", default-features = false, features = ["clone-impls", "derive", "parsing", "printing", "proc-macro"] } +quote = "1.0" [dev-dependencies] libp2p = { version = "0.13.0", path = "../.." } diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index da45329e..baae0cd8 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -96,8 +96,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) .collect::>(); - additional.push(quote!{#substream_generic: ::libp2p::tokio_io::AsyncRead}); - additional.push(quote!{#substream_generic: ::libp2p::tokio_io::AsyncWrite}); + additional.push(quote!{#substream_generic: ::libp2p::futures::io::AsyncRead}); + additional.push(quote!{#substream_generic: ::libp2p::futures::io::AsyncWrite}); + additional.push(quote!{#substream_generic: Unpin}); if let Some(where_clause) = where_clause { if where_clause.predicates.trailing_punct() { @@ -118,7 +119,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { for meta_items in ast.attrs.iter().filter_map(get_meta_items) { for meta_item in meta_items { match meta_item { - syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.ident == "out_event" => { + syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.path.is_ident("out_event") => { if let syn::Lit::Str(ref s) = m.lit { let ident: syn::Type = syn::parse_str(&s.value()).unwrap(); out = quote!{#ident}; @@ -381,11 +382,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { // If we find a `#[behaviour(poll_method = "poll")]` attribute on the struct, we call // `self.poll()` at the end of the polling. let poll_method = { - let mut poll_method = quote!{Poll::Pending}; + let mut poll_method = quote!{std::task::Poll::Pending}; for meta_items in ast.attrs.iter().filter_map(get_meta_items) { for meta_item in meta_items { match meta_item { - syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.ident == "poll_method" => { + syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.path.is_ident("poll_method") => { if let syn::Lit::Str(ref s) = m.lit { let ident: Ident = syn::parse_str(&s.value()).unwrap(); poll_method = quote!{#name::#ident(self)}; @@ -418,26 +419,26 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { Some(quote!{ loop { - match #field_name.poll(poll_params) { - Poll::Ready(#network_behaviour_action::GenerateEvent(event)) => { + match #field_name.poll(cx, poll_params) { + std::task::Poll::Ready(#network_behaviour_action::GenerateEvent(event)) => { #net_behv_event_proc::inject_event(self, event) } - Poll::Ready(#network_behaviour_action::DialAddress { address }) => { - return Poll::Ready(#network_behaviour_action::DialAddress { address }); + std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => { + return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }); } - Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => { - return Poll::Ready(#network_behaviour_action::DialPeer { peer_id }); + std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => { + return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }); } - Poll::Ready(#network_behaviour_action::SendEvent { peer_id, event }) => { - return Poll::Ready(#network_behaviour_action::SendEvent { + std::task::Poll::Ready(#network_behaviour_action::SendEvent { peer_id, event }) => { + return std::task::Poll::Ready(#network_behaviour_action::SendEvent { peer_id, event: #wrapped_event, }); } - Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }) => { - return Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }); + std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }) => { + return std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }); } - Poll::Pending => break, + std::task::Poll::Pending => break, } } }) @@ -526,9 +527,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { fn get_meta_items(attr: &syn::Attribute) -> Option> { if attr.path.segments.len() == 1 && attr.path.segments[0].ident == "behaviour" { - match attr.interpret_meta() { - Some(syn::Meta::List(ref meta)) => Some(meta.nested.iter().cloned().collect()), - _ => { + match attr.parse_meta() { + Ok(syn::Meta::List(ref meta)) => Some(meta.nested.iter().cloned().collect()), + Ok(_) => None, + Err(e) => { + eprintln!("error parsing attribute metadata: {}", e); None } } @@ -542,7 +545,7 @@ fn is_ignored(field: &syn::Field) -> bool { for meta_items in field.attrs.iter().filter_map(get_meta_items) { for meta_item in meta_items { match meta_item { - syn::NestedMeta::Meta(syn::Meta::Word(ref m)) if m == "ignore" => { + syn::NestedMeta::Meta(syn::Meta::Path(ref m)) if m.is_ident("ignore") => { return true; } _ => () diff --git a/misc/core-derive/tests/test.rs b/misc/core-derive/tests/test.rs index 7213a1cf..31752b1c 100644 --- a/misc/core-derive/tests/test.rs +++ b/misc/core-derive/tests/test.rs @@ -46,7 +46,7 @@ fn one_field() { } #[allow(dead_code)] - fn foo() { + fn foo() { require_net_behaviour::>(); } } @@ -71,7 +71,7 @@ fn two_fields() { } #[allow(dead_code)] - fn foo() { + fn foo() { require_net_behaviour::>(); } } @@ -104,7 +104,7 @@ fn three_fields() { } #[allow(dead_code)] - fn foo() { + fn foo() { require_net_behaviour::>(); } } @@ -130,11 +130,11 @@ fn custom_polling() { } impl Foo { - fn foo(&mut self) -> libp2p::futures::Async> { libp2p::futures::Async::NotReady } + fn foo(&mut self) -> std::task::Poll> { std::task::Poll::Pending } } #[allow(dead_code)] - fn foo() { + fn foo() { require_net_behaviour::>(); } } @@ -160,7 +160,7 @@ fn custom_event_no_polling() { } #[allow(dead_code)] - fn foo() { + fn foo() { require_net_behaviour::>(); } } @@ -186,11 +186,11 @@ fn custom_event_and_polling() { } impl Foo { - fn foo(&mut self) -> libp2p::futures::Async> { libp2p::futures::Async::NotReady } + fn foo(&mut self) -> std::task::Poll> { std::task::Poll::Pending } } #[allow(dead_code)] - fn foo() { + fn foo() { require_net_behaviour::>(); } } diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 37e9ad17..578f0b7b 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -185,7 +185,7 @@ impl PingHandler { impl ProtocolsHandler for PingHandler where - TSubstream: AsyncRead + AsyncWrite + Unpin + 'static, + TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type InEvent = Void; type OutEvent = PingResult; diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 38d0df4f..dbdad493 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -93,7 +93,7 @@ impl Default for Ping { impl NetworkBehaviour for Ping where - TSubstream: AsyncRead + AsyncWrite + Unpin + 'static, + TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type ProtocolsHandler = PingHandler; type OutEvent = PingEvent; diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index ad9cd8ea..a5f105c7 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -18,11 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::prelude::*; +use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use log::debug; use rand::{distributions, prelude::*}; -use std::{io, iter, pin::Pin, time::Duration}; +use std::{io, iter, time::Duration}; use wasm_timer::Instant; /// Represents a prototype for an upgrade to handle the ping protocol. @@ -55,36 +55,35 @@ impl UpgradeInfo for Ping { impl InboundUpgrade for Ping where - TSocket: AsyncRead + AsyncWrite + Unpin + 'static, + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Output = (); type Error = io::Error; - type Future = Pin>>>; + type Future = BoxFuture<'static, Result<(), io::Error>>; fn upgrade_inbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { - Box::pin(async move { + async move { let mut payload = [0u8; 32]; socket.read_exact(&mut payload).await?; socket.write_all(&payload).await?; socket.close().await?; Ok(()) - }) + }.boxed() } } impl OutboundUpgrade for Ping where - TSocket: AsyncRead + AsyncWrite + Unpin + 'static, + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Output = Duration; type Error = io::Error; - type Future = Pin>>>; + type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { let payload: [u8; 32] = thread_rng().sample(distributions::Standard); debug!("Preparing ping payload {:?}", payload); - - Box::pin(async move { + async move { socket.write_all(&payload).await?; socket.close().await?; let started = Instant::now(); @@ -96,7 +95,7 @@ where } else { Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch")) } - }) + }.boxed() } }