Fix examples and update core-derive. (#1317)

This commit is contained in:
Toralf Wittner 2019-11-25 10:45:04 +01:00 committed by GitHub
parent df71d4a861
commit b7644722ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 170 additions and 207 deletions

View File

@ -45,9 +45,8 @@ libp2p-tcp = { version = "0.13.0", path = "transports/tcp" }
libp2p-websocket = { version = "0.13.0", path = "transports/websocket", optional = true } libp2p-websocket = { version = "0.13.0", path = "transports/websocket", optional = true }
[dev-dependencies] [dev-dependencies]
async-std = "1.0"
env_logger = "0.7.1" env_logger = "0.7.1"
tokio = "0.1"
tokio-stdin-stdout = "0.1"
[workspace] [workspace]
members = [ members = [

View File

@ -49,20 +49,21 @@
//! //!
//! The two nodes then connect. //! The two nodes then connect.
use futures::prelude::*; use async_std::{io, task};
use futures::{future, prelude::*};
use libp2p::{ use libp2p::{
Multiaddr,
PeerId, PeerId,
Swarm, Swarm,
NetworkBehaviour, NetworkBehaviour,
identity, identity,
tokio_codec::{FramedRead, LinesCodec},
tokio_io::{AsyncRead, AsyncWrite},
floodsub::{self, Floodsub, FloodsubEvent}, floodsub::{self, Floodsub, FloodsubEvent},
mdns::{Mdns, MdnsEvent}, mdns::{Mdns, MdnsEvent},
swarm::NetworkBehaviourEventProcess swarm::NetworkBehaviourEventProcess
}; };
use std::{error::Error, task::{Context, Poll}};
fn main() { fn main() -> Result<(), Box<dyn Error>> {
env_logger::init(); env_logger::init();
// Create a random PeerId // Create a random PeerId
@ -71,7 +72,7 @@ fn main() {
println!("Local peer id: {:?}", local_peer_id); println!("Local peer id: {:?}", local_peer_id);
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols // Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key); let transport = libp2p::build_development_transport(local_key)?;
// Create a Floodsub topic // Create a Floodsub topic
let floodsub_topic = floodsub::TopicBuilder::new("chat").build(); let floodsub_topic = floodsub::TopicBuilder::new("chat").build();
@ -87,12 +88,11 @@ fn main() {
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour<TSubstream> { impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour<TSubstream> {
fn inject_event(&mut self, event: MdnsEvent) { fn inject_event(&mut self, event: MdnsEvent) {
match event { match event {
MdnsEvent::Discovered(list) => { MdnsEvent::Discovered(list) =>
for (peer, _) in list { for (peer, _) in list {
self.floodsub.add_node_to_partial_view(peer); self.floodsub.add_node_to_partial_view(peer);
} }
}, MdnsEvent::Expired(list) =>
MdnsEvent::Expired(list) => {
for (peer, _) in list { for (peer, _) in list {
if !self.mdns.has_node(&peer) { if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer); self.floodsub.remove_node_from_partial_view(&peer);
@ -101,7 +101,6 @@ fn main() {
} }
} }
} }
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour<TSubstream> { impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour<TSubstream> {
// Called when `floodsub` produces an event. // Called when `floodsub` produces an event.
@ -114,9 +113,10 @@ fn main() {
// Create a Swarm to manage peers and events // Create a Swarm to manage peers and events
let mut swarm = { let mut swarm = {
let mdns = task::block_on(Mdns::new())?;
let mut behaviour = MyBehaviour { let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(local_peer_id.clone()), floodsub: Floodsub::new(local_peer_id.clone()),
mdns: Mdns::new().expect("Failed to create mDNS service"), mdns
}; };
behaviour.floodsub.subscribe(floodsub_topic.clone()); behaviour.floodsub.subscribe(floodsub_topic.clone());
@ -125,42 +125,32 @@ fn main() {
// Reach out to another node if specified // Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) { if let Some(to_dial) = std::env::args().nth(1) {
let dialing = to_dial.clone(); let addr: Multiaddr = to_dial.parse()?;
match to_dial.parse() { Swarm::dial_addr(&mut swarm, addr)?;
Ok(to_dial) => { println!("Dialed {:?}", to_dial)
match libp2p::Swarm::dial_addr(&mut swarm, to_dial) {
Ok(_) => println!("Dialed {:?}", dialing),
Err(e) => println!("Dial {:?} failed: {:?}", dialing, e)
}
},
Err(err) => println!("Failed to parse address to dial: {:?}", err),
}
} }
// Read full lines from stdin // Read full lines from stdin
let stdin = tokio_stdin_stdout::stdin(0); let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());
// Listen on all interfaces and whatever port the OS assigns // Listen on all interfaces and whatever port the OS assigns
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off // Kick it off
let mut listening = false; let mut listening = false;
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { task::block_on(future::poll_fn(move |cx: &mut Context| {
loop { loop {
match framed_stdin.poll().expect("Error while polling stdin") { match stdin.try_poll_next_unpin(cx)? {
Async::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), Poll::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()),
Async::Ready(None) => panic!("Stdin closed"), Poll::Ready(None) => panic!("Stdin closed"),
Async::NotReady => break, Poll::Pending => break
}; }
} }
loop { loop {
match swarm.poll().expect("Error while polling swarm") { match swarm.poll_next_unpin(cx) {
Async::Ready(Some(_)) => { Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(None) => return Poll::Ready(Ok(())),
}, Poll::Pending => {
Async::Ready(None) | Async::NotReady => {
if !listening { if !listening {
if let Some(a) = Swarm::listeners(&swarm).next() { if let Some(a) = Swarm::listeners(&swarm).next() {
println!("Listening on {:?}", a); println!("Listening on {:?}", a);
@ -171,7 +161,6 @@ fn main() {
} }
} }
} }
Poll::Pending
Ok(Async::NotReady) }))
}));
} }

View File

@ -29,19 +29,22 @@
//! //!
//! 4. Close with Ctrl-c. //! 4. Close with Ctrl-c.
use async_std::{io, task};
use futures::prelude::*; use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{record::Key, Kademlia, KademliaEvent, PutRecordOk, Quorum, Record}; use libp2p::kad::{record::Key, Kademlia, KademliaEvent, PutRecordOk, Quorum, Record};
use libp2p::{ use libp2p::{
build_development_transport, identity, NetworkBehaviour,
PeerId,
Swarm,
build_development_transport,
identity,
mdns::{Mdns, MdnsEvent}, mdns::{Mdns, MdnsEvent},
swarm::NetworkBehaviourEventProcess, swarm::NetworkBehaviourEventProcess
tokio_codec::{FramedRead, LinesCodec},
tokio_io::{AsyncRead, AsyncWrite},
NetworkBehaviour, PeerId, Swarm,
}; };
use std::{error::Error, task::{Context, Poll}};
fn main() { fn main() -> Result<(), Box<dyn Error>> {
env_logger::init(); env_logger::init();
// Create a random key for ourselves. // Create a random key for ourselves.
@ -49,17 +52,18 @@ fn main() {
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol. // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol.
let transport = build_development_transport(local_key); let transport = build_development_transport(local_key)?;
// We create a custom network behaviour that combines Kademlia and mDNS. // We create a custom network behaviour that combines Kademlia and mDNS.
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> { struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
kademlia: Kademlia<TSubstream, MemoryStore>, kademlia: Kademlia<TSubstream, MemoryStore>,
mdns: Mdns<TSubstream>, mdns: Mdns<TSubstream>
} }
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent> impl<T> NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour<T>
for MyBehaviour<TSubstream> where
T: AsyncRead + AsyncWrite
{ {
// Called when `mdns` produces an event. // Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) { fn inject_event(&mut self, event: MdnsEvent) {
@ -71,8 +75,9 @@ fn main() {
} }
} }
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<KademliaEvent> impl<T> NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour<T>
for MyBehaviour<TSubstream> where
T: AsyncRead + AsyncWrite
{ {
// Called when `kademlia` produces an event. // Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) { fn inject_event(&mut self, message: KademliaEvent) {
@ -108,58 +113,50 @@ fn main() {
// Create a Kademlia behaviour. // Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id.clone()); let store = MemoryStore::new(local_peer_id.clone());
let kademlia = Kademlia::new(local_peer_id.clone(), store); let kademlia = Kademlia::new(local_peer_id.clone(), store);
let mdns = task::block_on(Mdns::new())?;
let behaviour = MyBehaviour { let behaviour = MyBehaviour { kademlia, mdns };
kademlia,
mdns: Mdns::new().expect("Failed to create mDNS service"),
};
Swarm::new(transport, behaviour, local_peer_id) Swarm::new(transport, behaviour, local_peer_id)
}; };
// Read full lines from stdin. // Read full lines from stdin
let stdin = tokio_stdin_stdout::stdin(0); let mut stdin = io::BufReader::new(io::stdin()).lines();
let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new());
// Listen on all interfaces and whatever port the OS assigns. // Listen on all interfaces and whatever port the OS assigns.
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off. // Kick it off.
let mut listening = false; let mut listening = false;
tokio::run(futures::future::poll_fn(move || { task::block_on(future::poll_fn(move |cx: &mut Context| {
loop { loop {
match framed_stdin.poll().expect("Error while polling stdin") { match stdin.try_poll_next_unpin(cx)? {
Async::Ready(Some(line)) => { Poll::Ready(Some(line)) => handle_input_line(&mut swarm.kademlia, line),
handle_input_line(&mut swarm.kademlia, line); Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break
} }
Async::Ready(None) => panic!("Stdin closed"),
Async::NotReady => break,
};
} }
loop { loop {
match swarm.poll().expect("Error while polling swarm") { match swarm.poll_next_unpin(cx) {
Async::Ready(Some(_)) => {} Poll::Ready(Some(event)) => println!("{:?}", event),
Async::Ready(None) | Async::NotReady => { Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
if !listening { if !listening {
if let Some(a) = Swarm::listeners(&swarm).next() { if let Some(a) = Swarm::listeners(&swarm).next() {
println!("Listening on {:?}", a); println!("Listening on {:?}", a);
listening = true; listening = true;
} }
} }
break; break
} }
} }
} }
Poll::Pending
}))
}
Ok(Async::NotReady) fn handle_input_line<T>(kademlia: &mut Kademlia<T, MemoryStore>, line: String)
})); where
} T: AsyncRead + AsyncWrite
{
fn handle_input_line<TSubstream: AsyncRead + AsyncWrite>(
kademlia: &mut Kademlia<TSubstream, MemoryStore>,
line: String,
) {
let mut args = line.split(" "); let mut args = line.split(" ");
match args.next() { match args.next() {

View File

@ -23,6 +23,7 @@
//! You can pass as parameter a base58 peer ID to search for. If you don't pass any parameter, a //! You can pass as parameter a base58 peer ID to search for. If you don't pass any parameter, a
//! peer ID will be generated randomly. //! peer ID will be generated randomly.
use async_std::task;
use futures::prelude::*; use futures::prelude::*;
use libp2p::{ use libp2p::{
Swarm, Swarm,
@ -32,10 +33,9 @@ use libp2p::{
}; };
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError}; use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError};
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use std::env; use std::{env, error::Error, time::Duration};
use std::time::Duration;
fn main() { fn main() -> Result<(), Box<dyn Error>> {
env_logger::init(); env_logger::init();
// Create a random key for ourselves. // Create a random key for ourselves.
@ -43,7 +43,7 @@ fn main() {
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol // Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol
let transport = build_development_transport(local_key); let transport = build_development_transport(local_key)?;
// Create a swarm to manage peers and events. // Create a swarm to manage peers and events.
let mut swarm = { let mut swarm = {
@ -60,7 +60,7 @@ fn main() {
behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/ behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/
// The only address that currently works. // The only address that currently works.
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap()); behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?);
// The following addresses always fail signature verification, possibly due to // The following addresses always fail signature verification, possibly due to
// RSA keys with < 2048 bits. // RSA keys with < 2048 bits.
@ -80,7 +80,7 @@ fn main() {
// Order Kademlia to search for a peer. // Order Kademlia to search for a peer.
let to_search: PeerId = if let Some(peer_id) = env::args().nth(1) { let to_search: PeerId = if let Some(peer_id) = env::args().nth(1) {
peer_id.parse().expect("Failed to parse peer ID to find") peer_id.parse()?
} else { } else {
identity::Keypair::generate_ed25519().public().into() identity::Keypair::generate_ed25519().public().into()
}; };
@ -89,25 +89,21 @@ fn main() {
swarm.get_closest_peers(to_search); swarm.get_closest_peers(to_search);
// Kick it off! // Kick it off!
tokio::run(futures::future::poll_fn(move || { task::block_on(async move {
loop { while let Some(event) = swarm.try_next().await? {
match swarm.poll().expect("Error while polling swarm") { if let KademliaEvent::GetClosestPeersResult(result) = event {
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(res))) => { match result {
match res { Ok(ok) =>
Ok(ok) => {
if !ok.peers.is_empty() { if !ok.peers.is_empty() {
println!("Query finished with closest peers: {:#?}", ok.peers); println!("Query finished with closest peers: {:#?}", ok.peers)
return Ok(Async::Ready(()));
} else { } else {
// The example is considered failed as there // The example is considered failed as there
// should always be at least 1 reachable peer. // should always be at least 1 reachable peer.
panic!("Query finished with no closest peers."); panic!("Query finished with no closest peers.")
} }
} Err(GetClosestPeersError::Timeout { peers, .. }) =>
Err(GetClosestPeersError::Timeout { peers, .. }) => {
if !peers.is_empty() { if !peers.is_empty() {
println!("Query timed out with closest peers: {:#?}", peers); println!("Query timed out with closest peers: {:#?}", peers)
return Ok(Async::Ready(()));
} else { } else {
// The example is considered failed as there // The example is considered failed as there
// should always be at least 1 reachable peer. // should always be at least 1 reachable peer.
@ -115,12 +111,7 @@ fn main() {
} }
} }
} }
},
Async::Ready(Some(_)) => {},
Async::Ready(None) | Async::NotReady => break,
} }
} Ok(())
})
Ok(Async::NotReady)
}));
} }

View File

@ -18,26 +18,17 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::prelude::*; use async_std::task;
use libp2p::mdns::service::{MdnsPacket, MdnsService}; use libp2p::mdns::service::{MdnsPacket, MdnsService};
use std::io; use std::error::Error;
fn main() { fn main() -> Result<(), Box<dyn Error>> {
// This example provides passive discovery of the libp2p nodes on the network that send // This example provides passive discovery of the libp2p nodes on the
// mDNS queries and answers. // network that send mDNS queries and answers.
task::block_on(async move {
// We start by creating the service. let mut service = MdnsService::new().await?;
let mut service = MdnsService::new().expect("Error while creating mDNS service");
// Create a never-ending `Future` that polls the service for events.
let future = futures::future::poll_fn(move || -> Poll<(), io::Error> {
loop { loop {
// Grab the next available packet from the service. let (srv, packet) = service.next().await;
let packet = match service.poll() {
Async::Ready(packet) => packet,
Async::NotReady => return Ok(Async::NotReady),
};
match packet { match packet {
MdnsPacket::Query(query) => { MdnsPacket::Query(query) => {
// We detected a libp2p mDNS query on the network. In a real application, you // We detected a libp2p mDNS query on the network. In a real application, you
@ -63,9 +54,7 @@ fn main() {
println!("Detected service query from {:?}", query.remote_addr()); println!("Detected service query from {:?}", query.remote_addr());
} }
} }
service = srv
} }
}); })
// Blocks the thread until the future runs to completion (which will never happen).
tokio::run(future.map_err(|err| panic!("{:?}", err)));
} }

View File

@ -38,11 +38,12 @@
//! The two nodes establish a connection, negotiate the ping protocol //! The two nodes establish a connection, negotiate the ping protocol
//! and begin pinging each other. //! and begin pinging each other.
use futures::{prelude::*, future}; use async_std::task;
use futures::{future, prelude::*};
use libp2p::{identity, PeerId, ping::{Ping, PingConfig}, Swarm}; use libp2p::{identity, PeerId, ping::{Ping, PingConfig}, Swarm};
use std::env; use std::{error::Error, task::{Context, Poll}};
fn main() { fn main() -> Result<(), Box<dyn Error>> {
env_logger::init(); env_logger::init();
// Create a random PeerId. // Create a random PeerId.
@ -51,7 +52,7 @@ fn main() {
println!("Local peer id: {:?}", peer_id); println!("Local peer id: {:?}", peer_id);
// Create a transport. // Create a transport.
let transport = libp2p::build_development_transport(id_keys); let transport = libp2p::build_development_transport(id_keys)?;
// Create a ping network behaviour. // Create a ping network behaviour.
// //
@ -66,38 +67,33 @@ fn main() {
// Dial the peer identified by the multi-address given as the second // Dial the peer identified by the multi-address given as the second
// command-line argument, if any. // command-line argument, if any.
if let Some(addr) = env::args().nth(1) { if let Some(addr) = std::env::args().nth(1) {
let remote_addr = addr.clone(); let remote = addr.parse()?;
match addr.parse() { Swarm::dial_addr(&mut swarm, remote)?;
Ok(remote) => { println!("Dialed {}", addr)
match Swarm::dial_addr(&mut swarm, remote) {
Ok(()) => println!("Dialed {:?}", remote_addr),
Err(e) => println!("Dialing {:?} failed with: {:?}", remote_addr, e)
}
},
Err(err) => println!("Failed to parse address to dial: {:?}", err),
}
} }
// Tell the swarm to listen on all interfaces and a random, OS-assigned port. // Tell the swarm to listen on all interfaces and a random, OS-assigned port.
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
// Use tokio to drive the `Swarm`.
let mut listening = false; let mut listening = false;
tokio::run(future::poll_fn(move || -> Result<_, ()> { task::block_on(future::poll_fn(move |cx: &mut Context| {
loop { loop {
match swarm.poll().expect("Error while polling swarm") { match swarm.poll_next_unpin(cx) {
Async::Ready(Some(e)) => println!("{:?}", e), Poll::Ready(Some(event)) => println!("{:?}", event),
Async::Ready(None) | Async::NotReady => { Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => {
if !listening { if !listening {
if let Some(a) = Swarm::listeners(&swarm).next() { for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", a); println!("Listening on {}", addr);
listening = true; listening = true;
} }
} }
return Ok(Async::NotReady) return Poll::Pending
} }
} }
} }
})); }));
Ok(())
} }

View File

@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"]
proc-macro = true proc-macro = true
[dependencies] [dependencies]
syn = { version = "0.15.22", default-features = false, features = ["clone-impls", "derive", "parsing", "printing", "proc-macro"] } syn = { version = "1.0.8", default-features = false, features = ["clone-impls", "derive", "parsing", "printing", "proc-macro"] }
quote = "0.6" quote = "1.0"
[dev-dependencies] [dev-dependencies]
libp2p = { version = "0.13.0", path = "../.." } libp2p = { version = "0.13.0", path = "../.." }

View File

@ -96,8 +96,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
additional.push(quote!{#substream_generic: ::libp2p::tokio_io::AsyncRead}); additional.push(quote!{#substream_generic: ::libp2p::futures::io::AsyncRead});
additional.push(quote!{#substream_generic: ::libp2p::tokio_io::AsyncWrite}); additional.push(quote!{#substream_generic: ::libp2p::futures::io::AsyncWrite});
additional.push(quote!{#substream_generic: Unpin});
if let Some(where_clause) = where_clause { if let Some(where_clause) = where_clause {
if where_clause.predicates.trailing_punct() { if where_clause.predicates.trailing_punct() {
@ -118,7 +119,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
for meta_items in ast.attrs.iter().filter_map(get_meta_items) { for meta_items in ast.attrs.iter().filter_map(get_meta_items) {
for meta_item in meta_items { for meta_item in meta_items {
match meta_item { match meta_item {
syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.ident == "out_event" => { syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.path.is_ident("out_event") => {
if let syn::Lit::Str(ref s) = m.lit { if let syn::Lit::Str(ref s) = m.lit {
let ident: syn::Type = syn::parse_str(&s.value()).unwrap(); let ident: syn::Type = syn::parse_str(&s.value()).unwrap();
out = quote!{#ident}; out = quote!{#ident};
@ -381,11 +382,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
// If we find a `#[behaviour(poll_method = "poll")]` attribute on the struct, we call // If we find a `#[behaviour(poll_method = "poll")]` attribute on the struct, we call
// `self.poll()` at the end of the polling. // `self.poll()` at the end of the polling.
let poll_method = { let poll_method = {
let mut poll_method = quote!{Poll::Pending}; let mut poll_method = quote!{std::task::Poll::Pending};
for meta_items in ast.attrs.iter().filter_map(get_meta_items) { for meta_items in ast.attrs.iter().filter_map(get_meta_items) {
for meta_item in meta_items { for meta_item in meta_items {
match meta_item { match meta_item {
syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.ident == "poll_method" => { syn::NestedMeta::Meta(syn::Meta::NameValue(ref m)) if m.path.is_ident("poll_method") => {
if let syn::Lit::Str(ref s) = m.lit { if let syn::Lit::Str(ref s) = m.lit {
let ident: Ident = syn::parse_str(&s.value()).unwrap(); let ident: Ident = syn::parse_str(&s.value()).unwrap();
poll_method = quote!{#name::#ident(self)}; poll_method = quote!{#name::#ident(self)};
@ -418,26 +419,26 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
Some(quote!{ Some(quote!{
loop { loop {
match #field_name.poll(poll_params) { match #field_name.poll(cx, poll_params) {
Poll::Ready(#network_behaviour_action::GenerateEvent(event)) => { std::task::Poll::Ready(#network_behaviour_action::GenerateEvent(event)) => {
#net_behv_event_proc::inject_event(self, event) #net_behv_event_proc::inject_event(self, event)
} }
Poll::Ready(#network_behaviour_action::DialAddress { address }) => { std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => {
return Poll::Ready(#network_behaviour_action::DialAddress { address }); return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address });
} }
Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => { std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => {
return Poll::Ready(#network_behaviour_action::DialPeer { peer_id }); return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id });
} }
Poll::Ready(#network_behaviour_action::SendEvent { peer_id, event }) => { std::task::Poll::Ready(#network_behaviour_action::SendEvent { peer_id, event }) => {
return Poll::Ready(#network_behaviour_action::SendEvent { return std::task::Poll::Ready(#network_behaviour_action::SendEvent {
peer_id, peer_id,
event: #wrapped_event, event: #wrapped_event,
}); });
} }
Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }) => { std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }) => {
return Poll::Ready(#network_behaviour_action::ReportObservedAddr { address }); return std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address });
} }
Poll::Pending => break, std::task::Poll::Pending => break,
} }
} }
}) })
@ -526,9 +527,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
fn get_meta_items(attr: &syn::Attribute) -> Option<Vec<syn::NestedMeta>> { fn get_meta_items(attr: &syn::Attribute) -> Option<Vec<syn::NestedMeta>> {
if attr.path.segments.len() == 1 && attr.path.segments[0].ident == "behaviour" { if attr.path.segments.len() == 1 && attr.path.segments[0].ident == "behaviour" {
match attr.interpret_meta() { match attr.parse_meta() {
Some(syn::Meta::List(ref meta)) => Some(meta.nested.iter().cloned().collect()), Ok(syn::Meta::List(ref meta)) => Some(meta.nested.iter().cloned().collect()),
_ => { Ok(_) => None,
Err(e) => {
eprintln!("error parsing attribute metadata: {}", e);
None None
} }
} }
@ -542,7 +545,7 @@ fn is_ignored(field: &syn::Field) -> bool {
for meta_items in field.attrs.iter().filter_map(get_meta_items) { for meta_items in field.attrs.iter().filter_map(get_meta_items) {
for meta_item in meta_items { for meta_item in meta_items {
match meta_item { match meta_item {
syn::NestedMeta::Meta(syn::Meta::Word(ref m)) if m == "ignore" => { syn::NestedMeta::Meta(syn::Meta::Path(ref m)) if m.is_ident("ignore") => {
return true; return true;
} }
_ => () _ => ()

View File

@ -46,7 +46,7 @@ fn one_field() {
} }
#[allow(dead_code)] #[allow(dead_code)]
fn foo<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite>() { fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>(); require_net_behaviour::<Foo<TSubstream>>();
} }
} }
@ -71,7 +71,7 @@ fn two_fields() {
} }
#[allow(dead_code)] #[allow(dead_code)]
fn foo<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite>() { fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>(); require_net_behaviour::<Foo<TSubstream>>();
} }
} }
@ -104,7 +104,7 @@ fn three_fields() {
} }
#[allow(dead_code)] #[allow(dead_code)]
fn foo<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite>() { fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>(); require_net_behaviour::<Foo<TSubstream>>();
} }
} }
@ -130,11 +130,11 @@ fn custom_polling() {
} }
impl<TSubstream> Foo<TSubstream> { impl<TSubstream> Foo<TSubstream> {
fn foo<T>(&mut self) -> libp2p::futures::Async<libp2p::swarm::NetworkBehaviourAction<T, ()>> { libp2p::futures::Async::NotReady } fn foo<T>(&mut self) -> std::task::Poll<libp2p::swarm::NetworkBehaviourAction<T, ()>> { std::task::Poll::Pending }
} }
#[allow(dead_code)] #[allow(dead_code)]
fn foo<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite>() { fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>(); require_net_behaviour::<Foo<TSubstream>>();
} }
} }
@ -160,7 +160,7 @@ fn custom_event_no_polling() {
} }
#[allow(dead_code)] #[allow(dead_code)]
fn foo<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite>() { fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>(); require_net_behaviour::<Foo<TSubstream>>();
} }
} }
@ -186,11 +186,11 @@ fn custom_event_and_polling() {
} }
impl<TSubstream> Foo<TSubstream> { impl<TSubstream> Foo<TSubstream> {
fn foo<T>(&mut self) -> libp2p::futures::Async<libp2p::swarm::NetworkBehaviourAction<T, String>> { libp2p::futures::Async::NotReady } fn foo<T>(&mut self) -> std::task::Poll<libp2p::swarm::NetworkBehaviourAction<T, String>> { std::task::Poll::Pending }
} }
#[allow(dead_code)] #[allow(dead_code)]
fn foo<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite>() { fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>(); require_net_behaviour::<Foo<TSubstream>>();
} }
} }

View File

@ -185,7 +185,7 @@ impl<TSubstream> PingHandler<TSubstream> {
impl<TSubstream> ProtocolsHandler for PingHandler<TSubstream> impl<TSubstream> ProtocolsHandler for PingHandler<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite + Unpin + 'static, TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = PingResult; type OutEvent = PingResult;

View File

@ -93,7 +93,7 @@ impl<TSubstream> Default for Ping<TSubstream> {
impl<TSubstream> NetworkBehaviour for Ping<TSubstream> impl<TSubstream> NetworkBehaviour for Ping<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite + Unpin + 'static, TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type ProtocolsHandler = PingHandler<TSubstream>; type ProtocolsHandler = PingHandler<TSubstream>;
type OutEvent = PingEvent; type OutEvent = PingEvent;

View File

@ -18,11 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::prelude::*; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use log::debug; use log::debug;
use rand::{distributions, prelude::*}; use rand::{distributions, prelude::*};
use std::{io, iter, pin::Pin, time::Duration}; use std::{io, iter, time::Duration};
use wasm_timer::Instant; use wasm_timer::Instant;
/// Represents a prototype for an upgrade to handle the ping protocol. /// Represents a prototype for an upgrade to handle the ping protocol.
@ -55,36 +55,35 @@ impl UpgradeInfo for Ping {
impl<TSocket> InboundUpgrade<TSocket> for Ping impl<TSocket> InboundUpgrade<TSocket> for Ping
where where
TSocket: AsyncRead + AsyncWrite + Unpin + 'static, TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type Output = (); type Output = ();
type Error = io::Error; type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<(), io::Error>>>>; type Future = BoxFuture<'static, Result<(), io::Error>>;
fn upgrade_inbound(self, mut socket: Negotiated<TSocket>, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, mut socket: Negotiated<TSocket>, _: Self::Info) -> Self::Future {
Box::pin(async move { async move {
let mut payload = [0u8; 32]; let mut payload = [0u8; 32];
socket.read_exact(&mut payload).await?; socket.read_exact(&mut payload).await?;
socket.write_all(&payload).await?; socket.write_all(&payload).await?;
socket.close().await?; socket.close().await?;
Ok(()) Ok(())
}) }.boxed()
} }
} }
impl<TSocket> OutboundUpgrade<TSocket> for Ping impl<TSocket> OutboundUpgrade<TSocket> for Ping
where where
TSocket: AsyncRead + AsyncWrite + Unpin + 'static, TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type Output = Duration; type Output = Duration;
type Error = io::Error; type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Duration, io::Error>>>>; type Future = BoxFuture<'static, Result<Duration, io::Error>>;
fn upgrade_outbound(self, mut socket: Negotiated<TSocket>, _: Self::Info) -> Self::Future { fn upgrade_outbound(self, mut socket: Negotiated<TSocket>, _: Self::Info) -> Self::Future {
let payload: [u8; 32] = thread_rng().sample(distributions::Standard); let payload: [u8; 32] = thread_rng().sample(distributions::Standard);
debug!("Preparing ping payload {:?}", payload); debug!("Preparing ping payload {:?}", payload);
async move {
Box::pin(async move {
socket.write_all(&payload).await?; socket.write_all(&payload).await?;
socket.close().await?; socket.close().await?;
let started = Instant::now(); let started = Instant::now();
@ -96,7 +95,7 @@ where
} else { } else {
Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch")) Err(io::Error::new(io::ErrorKind::InvalidData, "Ping payload mismatch"))
} }
}) }.boxed()
} }
} }