swarm/: Drive ExpandedSwarm via Stream trait only (#2100)

Change `Stream` implementation of `ExpandedSwarm` to return all
`SwarmEvents` instead of only the `NetworkBehaviour`'s events.

Remove `ExpandedSwarm::next_event`. Users can use `<ExpandedSwarm as
StreamExt>::next` instead.

Remove `ExpandedSwarm::next`. Users can use `<ExpandedSwarm as
StreamExt>::filter_map` instead.
This commit is contained in:
Elena Frank
2021-06-14 20:41:44 +02:00
committed by GitHub
parent d45606ac97
commit e8fed53598
20 changed files with 340 additions and 331 deletions

View File

@ -36,11 +36,11 @@
//! --features="floodsub mplex noise tcp-tokio mdns" //! --features="floodsub mplex noise tcp-tokio mdns"
//! ``` //! ```
use futures::StreamExt;
use libp2p::{ use libp2p::{
Multiaddr, Multiaddr,
NetworkBehaviour, NetworkBehaviour,
PeerId, PeerId,
Swarm,
Transport, Transport,
core::upgrade, core::upgrade,
identity, identity,
@ -48,7 +48,7 @@ use libp2p::{
mdns::{Mdns, MdnsEvent}, mdns::{Mdns, MdnsEvent},
mplex, mplex,
noise, noise,
swarm::{NetworkBehaviourEventProcess, SwarmBuilder}, swarm::{NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
// `TokioTcpConfig` is available through the `tcp-tokio` feature. // `TokioTcpConfig` is available through the `tcp-tokio` feature.
tcp::TokioTcpConfig, tcp::TokioTcpConfig,
}; };
@ -149,29 +149,16 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off // Kick it off
let mut listening = false;
loop { loop {
let to_publish = { tokio::select! {
tokio::select! { line = stdin.next_line() => {
line = stdin.next_line() => { let line = line?.expect("stdin closed");
let line = line?.expect("stdin closed"); swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
Some((floodsub_topic.clone(), line))
}
event = swarm.next() => {
// All events are handled by the `NetworkBehaviourEventProcess`es.
// I.e. the `swarm.next()` future drives the `Swarm` without ever
// terminating.
panic!("Unexpected event: {:?}", event);
}
} }
}; event = swarm.select_next_some() => {
if let Some((topic, line)) = to_publish { if let SwarmEvent::NewListenAddr(addr) = event {
swarm.behaviour_mut().floodsub.publish(topic, line.as_bytes()); println!("Listening on {:?}", addr);
} }
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
} }
} }
} }

View File

@ -59,7 +59,7 @@ use libp2p::{
identity, identity,
floodsub::{self, Floodsub, FloodsubEvent}, floodsub::{self, Floodsub, FloodsubEvent},
mdns::{Mdns, MdnsConfig, MdnsEvent}, mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::NetworkBehaviourEventProcess swarm::{NetworkBehaviourEventProcess, SwarmEvent}
}; };
use std::{error::Error, task::{Context, Poll}}; use std::{error::Error, task::{Context, Poll}};
@ -124,7 +124,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = { let mut swarm = {
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?; let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let mut behaviour = MyBehaviour { let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(local_peer_id.clone()), floodsub: Floodsub::new(local_peer_id),
mdns, mdns,
ignored_member: false, ignored_member: false,
}; };
@ -147,7 +147,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off // Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
match stdin.try_poll_next_unpin(cx)? { match stdin.try_poll_next_unpin(cx)? {
@ -160,17 +159,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
loop { loop {
match swarm.poll_next_unpin(cx) { match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event), Poll::Ready(Some(event)) => {
Poll::Ready(None) => return Poll::Ready(Ok(())), if let SwarmEvent::NewListenAddr(addr) = event {
Poll::Pending => { println!("Listening on {:?}", addr);
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
} }
break
} }
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
} }
} }
Poll::Pending Poll::Pending

View File

@ -61,7 +61,7 @@ use libp2p::{
development_transport, development_transport,
identity, identity,
mdns::{Mdns, MdnsConfig, MdnsEvent}, mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::NetworkBehaviourEventProcess swarm::{NetworkBehaviourEventProcess, SwarmEvent}
}; };
use std::{error::Error, task::{Context, Poll}}; use std::{error::Error, task::{Context, Poll}};
@ -150,8 +150,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a swarm to manage peers and events. // Create a swarm to manage peers and events.
let mut swarm = { let mut swarm = {
// Create a Kademlia behaviour. // Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id.clone()); let store = MemoryStore::new(local_peer_id);
let kademlia = Kademlia::new(local_peer_id.clone(), store); let kademlia = Kademlia::new(local_peer_id, store);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?; let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns }; let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id) Swarm::new(transport, behaviour, local_peer_id)
@ -164,7 +164,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off. // Kick it off.
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
match stdin.try_poll_next_unpin(cx)? { match stdin.try_poll_next_unpin(cx)? {
@ -175,17 +174,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
loop { loop {
match swarm.poll_next_unpin(cx) { match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event), Poll::Ready(Some(event)) => {
Poll::Ready(None) => return Poll::Ready(Ok(())), if let SwarmEvent::NewListenAddr(addr) = event {
Poll::Pending => { println!("Listening on {:?}", addr);
if !listening {
if let Some(a) = Swarm::listeners(&swarm).next() {
println!("Listening on {:?}", a);
listening = true;
}
} }
break
} }
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
} }
} }
Poll::Pending Poll::Pending
@ -193,7 +188,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) { fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
let mut args = line.split(" "); let mut args = line.split(' ');
match args.next() { match args.next() {
Some("GET") => { Some("GET") => {

View File

@ -53,7 +53,7 @@ use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{ use libp2p::gossipsub::{
GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode, GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode,
}; };
use libp2p::{gossipsub, identity, PeerId}; use libp2p::{gossipsub, identity, swarm::SwarmEvent, PeerId};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::time::Duration; use std::time::Duration;
@ -136,7 +136,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut stdin = io::BufReader::new(io::stdin()).lines(); let mut stdin = io::BufReader::new(io::stdin()).lines();
// Kick it off // Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? { if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
@ -152,30 +151,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop { loop {
match swarm.poll_next_unpin(cx) { match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(gossip_event)) => match gossip_event { Poll::Ready(Some(event)) => match event {
GossipsubEvent::Message { SwarmEvent::Behaviour(GossipsubEvent::Message {
propagation_source: peer_id, propagation_source: peer_id,
message_id: id, message_id: id,
message, message,
} => println!( }) => println!(
"Got message: {} with id: {} from peer: {:?}", "Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data), String::from_utf8_lossy(&message.data),
id, id,
peer_id peer_id
), ),
SwarmEvent::NewListenAddr(addr) => {
println!("Listening on {:?}", addr);
}
_ => {} _ => {}
}, },
Poll::Ready(None) | Poll::Pending => break, Poll::Ready(None) | Poll::Pending => break,
} }
} }
if !listening {
for addr in libp2p::Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
}
Poll::Pending Poll::Pending
})) }))
} }

View File

@ -24,9 +24,10 @@
//! peer ID will be generated randomly. //! peer ID will be generated randomly.
use async_std::task; use async_std::task;
use futures::StreamExt;
use libp2p::{ use libp2p::{
Multiaddr, Multiaddr,
Swarm, swarm::{Swarm, SwarmEvent},
PeerId, PeerId,
identity, identity,
development_transport development_transport
@ -64,8 +65,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Kademlia behaviour. // Create a Kademlia behaviour.
let mut cfg = KademliaConfig::default(); let mut cfg = KademliaConfig::default();
cfg.set_query_timeout(Duration::from_secs(5 * 60)); cfg.set_query_timeout(Duration::from_secs(5 * 60));
let store = MemoryStore::new(local_peer_id.clone()); let store = MemoryStore::new(local_peer_id);
let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg); let mut behaviour = Kademlia::with_config(local_peer_id, store, cfg);
// Add the bootnodes to the local routing table. `libp2p-dns` built // Add the bootnodes to the local routing table. `libp2p-dns` built
// into the `transport` resolves the `dnsaddr` when Kademlia tries // into the `transport` resolves the `dnsaddr` when Kademlia tries
@ -91,11 +92,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Kick it off! // Kick it off!
task::block_on(async move { task::block_on(async move {
loop { loop {
let event = swarm.next().await; let event = swarm.select_next_some().await;
if let KademliaEvent::QueryResult { if let SwarmEvent::Behaviour(KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(result), result: QueryResult::GetClosestPeers(result),
.. ..
} = event { }) = event {
match result { match result {
Ok(ok) => Ok(ok) =>
if !ok.peers.is_empty() { if !ok.peers.is_empty() {

View File

@ -44,7 +44,7 @@ use libp2p::{
noise, noise,
ping::{self, Ping, PingConfig, PingEvent}, ping::{self, Ping, PingConfig, PingEvent},
pnet::{PnetConfig, PreSharedKey}, pnet::{PnetConfig, PreSharedKey},
swarm::NetworkBehaviourEventProcess, swarm::{NetworkBehaviourEventProcess, SwarmEvent},
tcp::TcpConfig, tcp::TcpConfig,
yamux::YamuxConfig, yamux::YamuxConfig,
Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport,
@ -254,7 +254,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Subscribing to {:?}", gossipsub_topic); println!("Subscribing to {:?}", gossipsub_topic);
behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap(); behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap();
Swarm::new(transport, behaviour, local_peer_id.clone()) Swarm::new(transport, behaviour, local_peer_id)
}; };
// Reach out to other nodes if specified // Reach out to other nodes if specified
@ -271,7 +271,6 @@ fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off // Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? { if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
@ -287,17 +286,13 @@ fn main() -> Result<(), Box<dyn Error>> {
} }
loop { loop {
match swarm.poll_next_unpin(cx) { match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event), Poll::Ready(Some(event)) => {
Poll::Ready(None) => return Poll::Ready(Ok(())), if let SwarmEvent::NewListenAddr(addr) = event {
Poll::Pending => { println!("Listening on {:?}", addr);
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Address {}/ipfs/{}", addr, local_peer_id);
listening = true;
}
} }
break;
} }
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
} }
} }
Poll::Pending Poll::Pending

View File

@ -18,7 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use libp2p::{identity, mdns::{Mdns, MdnsConfig, MdnsEvent}, PeerId, Swarm}; use futures::StreamExt;
use libp2p::{
identity,
mdns::{Mdns, MdnsConfig, MdnsEvent},
swarm::{Swarm, SwarmEvent},
PeerId
};
use std::error::Error; use std::error::Error;
#[async_std::main] #[async_std::main]
@ -43,17 +49,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
loop { loop {
match swarm.next().await { match swarm.select_next_some().await {
MdnsEvent::Discovered(peers) => { SwarmEvent::Behaviour(MdnsEvent::Discovered(peers)) => {
for (peer, addr) in peers { for (peer, addr) in peers {
println!("discovered {} {}", peer, addr); println!("discovered {} {}", peer, addr);
} }
} }
MdnsEvent::Expired(expired) => { SwarmEvent::Behaviour(MdnsEvent::Expired(expired)) => {
for (peer, addr) in expired { for (peer, addr) in expired {
println!("expired {} {}", peer, addr); println!("expired {} {}", peer, addr);
} }
} }
_ => {}
} }
} }
} }

View File

@ -43,7 +43,7 @@
use futures::executor::block_on; use futures::executor::block_on;
use futures::prelude::*; use futures::prelude::*;
use libp2p::ping::{Ping, PingConfig}; use libp2p::ping::{Ping, PingConfig};
use libp2p::swarm::Swarm; use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::{identity, PeerId}; use libp2p::{identity, PeerId};
use std::error::Error; use std::error::Error;
use std::task::Poll; use std::task::Poll;
@ -76,20 +76,15 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Dialed {}", addr) println!("Dialed {}", addr)
} }
let mut listening = false;
block_on(future::poll_fn(move |cx| loop { block_on(future::poll_fn(move |cx| loop {
match swarm.poll_next_unpin(cx) { match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event), Poll::Ready(Some(event)) => match event {
SwarmEvent::NewListenAddr(addr) => println!("Listening on {:?}", addr),
SwarmEvent::Behaviour(event) => println!("{:?}", event),
_ => {}
},
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => { Poll::Pending => return Poll::Pending,
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {}", addr);
listening = true;
}
}
return Poll::Pending;
}
} }
})); }));

View File

@ -37,7 +37,7 @@ use libp2p_gossipsub::{
ValidationMode, ValidationMode,
}; };
use libp2p_plaintext::PlainText2Config; use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::Swarm; use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_yamux as yamux; use libp2p_yamux as yamux;
struct Graph { struct Graph {
@ -49,10 +49,13 @@ impl Future for Graph {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for (addr, node) in &mut self.nodes { for (addr, node) in &mut self.nodes {
match node.poll_next_unpin(cx) { loop {
Poll::Ready(Some(event)) => return Poll::Ready((addr.clone(), event)), match node.poll_next_unpin(cx) {
Poll::Ready(None) => panic!("unexpected None when polling nodes"), Poll::Ready(Some(SwarmEvent::Behaviour(event))) => return Poll::Ready((addr.clone(), event)),
Poll::Pending => {} Poll::Ready(Some(_)) => {}
Poll::Ready(None) => panic!("unexpected None when polling nodes"),
Poll::Pending => break,
}
} }
} }

View File

@ -495,7 +495,7 @@ mod tests {
let listen_addr = async_std::task::block_on(async { let listen_addr = async_std::task::block_on(async {
loop { loop {
let swarm1_fut = swarm1.next_event(); let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut); pin_mut!(swarm1_fut);
match swarm1_fut.await { match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr, SwarmEvent::NewListenAddr(addr) => return addr,
@ -511,13 +511,16 @@ mod tests {
// either `Identified` event arrives correctly. // either `Identified` event arrives correctly.
async_std::task::block_on(async move { async_std::task::block_on(async move {
loop { loop {
let swarm1_fut = swarm1.next(); let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut); pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.next(); let swarm2_fut = swarm2.select_next_some();
pin_mut!(swarm2_fut); pin_mut!(swarm2_fut);
match future::select(swarm1_fut, swarm2_fut).await.factor_second().0 { match future::select(swarm1_fut, swarm2_fut).await.factor_second().0 {
future::Either::Left(IdentifyEvent::Received { info, .. }) => { future::Either::Left(SwarmEvent::Behaviour(IdentifyEvent::Received {
info,
..
})) => {
assert_eq!(info.public_key, pubkey2); assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c"); assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d"); assert_eq!(info.agent_version, "d");
@ -525,7 +528,10 @@ mod tests {
assert!(info.listen_addrs.is_empty()); assert!(info.listen_addrs.is_empty());
return; return;
} }
future::Either::Right(IdentifyEvent::Received { info, .. }) => { future::Either::Right(SwarmEvent::Behaviour(IdentifyEvent::Received {
info,
..
})) => {
assert_eq!(info.public_key, pubkey1); assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a"); assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b"); assert_eq!(info.agent_version, "b");
@ -568,7 +574,7 @@ mod tests {
let listen_addr = async_std::task::block_on(async { let listen_addr = async_std::task::block_on(async {
loop { loop {
let swarm1_fut = swarm1.next_event(); let swarm1_fut = swarm1.select_next_some();
pin_mut!(swarm1_fut); pin_mut!(swarm1_fut);
match swarm1_fut.await { match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr, SwarmEvent::NewListenAddr(addr) => return addr,
@ -581,8 +587,8 @@ mod tests {
async_std::task::block_on(async move { async_std::task::block_on(async move {
loop { loop {
let swarm1_fut = swarm1.next_event(); let swarm1_fut = swarm1.select_next_some();
let swarm2_fut = swarm2.next_event(); let swarm2_fut = swarm2.select_next_some();
{ {
pin_mut!(swarm1_fut); pin_mut!(swarm1_fut);

View File

@ -42,7 +42,7 @@ use libp2p_core::{
multihash::{Code, Multihash, MultihashDigest}, multihash::{Code, Multihash, MultihashDigest},
}; };
use libp2p_noise as noise; use libp2p_noise as noise;
use libp2p_swarm::Swarm; use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_yamux as yamux; use libp2p_yamux as yamux;
use quickcheck::*; use quickcheck::*;
use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng}; use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
@ -185,9 +185,9 @@ fn bootstrap() {
for (i, swarm) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::Bootstrap(Ok(ok)), .. id, result: QueryResult::Bootstrap(Ok(ok)), ..
})) => { }))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert_eq!(i, 0); assert_eq!(i, 0);
if first { if first {
@ -265,9 +265,9 @@ fn query_iter() {
for (i, swarm) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::GetClosestPeers(Ok(ok)), .. id, result: QueryResult::GetClosestPeers(Ok(ok)), ..
})) => { }))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm_ids[i], expected_swarm_id);
@ -318,9 +318,9 @@ fn unresponsive_not_returned_direct() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(Ok(ok)), .. result: QueryResult::GetClosestPeers(Ok(ok)), ..
})) => { }))) => {
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
assert_eq!(ok.peers.len(), 0); assert_eq!(ok.peers.len(), 0);
return Poll::Ready(()); return Poll::Ready(());
@ -368,9 +368,9 @@ fn unresponsive_not_returned_indirect() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(Ok(ok)), .. result: QueryResult::GetClosestPeers(Ok(ok)), ..
})) => { }))) => {
assert_eq!(&ok.key[..], search_target.to_bytes().as_slice()); assert_eq!(&ok.key[..], search_target.to_bytes().as_slice());
assert_eq!(ok.peers.len(), 1); assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id); assert_eq!(ok.peers[0], first_peer_id);
@ -412,9 +412,9 @@ fn get_record_not_found() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::GetRecord(Err(e)), .. id, result: QueryResult::GetRecord(Err(e)), ..
})) => { }))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
if let GetRecordError::NotFound { key, closest_peers, } = e { if let GetRecordError::NotFound { key, closest_peers, } = e {
assert_eq!(key, target_key); assert_eq!(key, target_key);
@ -519,12 +519,12 @@ fn put_record() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::PutRecord(res), stats id, result: QueryResult::PutRecord(res), stats
})) | }))) |
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::RepublishRecord(res), stats id, result: QueryResult::RepublishRecord(res), stats
})) => { }))) => {
assert!(qids.is_empty() || qids.remove(&id)); assert!(qids.is_empty() || qids.remove(&id));
assert!(stats.duration().is_some()); assert!(stats.duration().is_some());
assert!(stats.num_successes() >= replication_factor.get() as u32); assert!(stats.num_successes() >= replication_factor.get() as u32);
@ -652,13 +652,13 @@ fn get_record() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, id,
result: QueryResult::GetRecord(Ok(GetRecordOk { result: QueryResult::GetRecord(Ok(GetRecordOk {
records, cache_candidates records, cache_candidates
})), })),
.. ..
})) => { }))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert_eq!(records.len(), 1); assert_eq!(records.len(), 1);
assert_eq!(records.first().unwrap().record, record); assert_eq!(records.first().unwrap().record, record);
@ -702,11 +702,11 @@ fn get_record_many() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })), result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })),
.. ..
})) => { }))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert!(records.len() >= num_results); assert!(records.len() >= num_results);
assert!(records.into_iter().all(|r| r.record == record)); assert!(records.into_iter().all(|r| r.record == record));
@ -784,15 +784,15 @@ fn add_provider() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::StartProviding(res), .. id, result: QueryResult::StartProviding(res), ..
})) | }))) |
Poll::Ready(Some(KademliaEvent::QueryResult { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult {
id, result: QueryResult::RepublishProvider(res), .. id, result: QueryResult::RepublishProvider(res), ..
})) => { }))) => {
assert!(qids.is_empty() || qids.remove(&id)); assert!(qids.is_empty() || qids.remove(&id));
match res { match res {
Err(e) => panic!(e), Err(e) => panic!("{:?}", e),
Ok(ok) => { Ok(ok) => {
assert!(keys.contains(&ok.key)); assert!(keys.contains(&ok.key));
results.push(ok.key); results.push(ok.key);
@ -900,16 +900,18 @@ fn exceed_jobs_max_queries() {
poll_fn(move |ctx| { poll_fn(move |ctx| {
for _ in 0 .. num { for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly. // There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { loop {
if let KademliaEvent::QueryResult { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
result: QueryResult::GetClosestPeers(Ok(r)), .. match e {
} = e { SwarmEvent::Behaviour(KademliaEvent::QueryResult {
assert!(r.peers.is_empty()) result: QueryResult::GetClosestPeers(Ok(r)), ..
}) => break assert!(r.peers.is_empty()),
SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e),
_ => {}
}
} else { } else {
panic!("Unexpected event: {:?}", e) panic!("Expected event")
} }
} else {
panic!("Expected event")
} }
} }
Poll::Ready(()) Poll::Ready(())
@ -970,10 +972,10 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() { for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult{ Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult{
result: QueryResult::GetRecord(result), result: QueryResult::GetRecord(result),
.. ..
})) => { }))) => {
if i != 0 { if i != 0 {
panic!("Expected `QueryResult` from Alice.") panic!("Expected `QueryResult` from Alice.")
} }
@ -1023,10 +1025,10 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() { for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult{ Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::QueryResult{
result: QueryResult::GetRecord(result), result: QueryResult::GetRecord(result),
.. ..
})) => { }))) => {
if i != 0 { if i != 0 {
panic!("Expected `QueryResult` from Alice.") panic!("Expected `QueryResult` from Alice.")
} }
@ -1087,9 +1089,9 @@ fn manual_bucket_inserts() {
for (_, swarm) in swarms.iter_mut() { for (_, swarm) in swarms.iter_mut() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::RoutablePeer { Poll::Ready(Some(SwarmEvent::Behaviour(KademliaEvent::RoutablePeer {
peer, address peer, address
})) => { }))) => {
assert_eq!(peer, expected.remove(&address).expect("Missing address")); assert_eq!(peer, expected.remove(&address).expect("Missing address"));
routable.push(peer); routable.push(peer);
if expected.is_empty() { if expected.is_empty() {

View File

@ -63,7 +63,7 @@ fn ping_pong() {
let peer1 = async move { let peer1 = async move {
loop { loop {
match swarm1.next_event().await { match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(), SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(),
SwarmEvent::Behaviour(PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) }) => { SwarmEvent::Behaviour(PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) }) => {
count1 -= 1; count1 -= 1;
@ -82,14 +82,20 @@ fn ping_pong() {
swarm2.dial_addr(rx.next().await.unwrap()).unwrap(); swarm2.dial_addr(rx.next().await.unwrap()).unwrap();
loop { loop {
match swarm2.next().await { match swarm2.select_next_some().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { SwarmEvent::Behaviour(PingEvent {
peer,
result: Ok(PingSuccess::Ping { rtt })
}) => {
count2 -= 1; count2 -= 1;
if count2 == 0 { if count2 == 0 {
return (pid2.clone(), peer, rtt) return (pid2.clone(), peer, rtt)
} }
}, },
PingEvent { result: Err(e), .. } => panic!("Ping failure: {:?}", e), SwarmEvent::Behaviour(PingEvent {
result: Err(e),
..
}) => panic!("Ping failure: {:?}", e),
_ => {} _ => {}
} }
} }
@ -130,7 +136,7 @@ fn max_failures() {
let mut count1: u8 = 0; let mut count1: u8 = 0;
loop { loop {
match swarm1.next_event().await { match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(), SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(),
SwarmEvent::Behaviour(PingEvent { SwarmEvent::Behaviour(PingEvent {
result: Ok(PingSuccess::Ping { .. }), .. result: Ok(PingSuccess::Ping { .. }), ..
@ -156,7 +162,7 @@ fn max_failures() {
let mut count2: u8 = 0; let mut count2: u8 = 0;
loop { loop {
match swarm2.next_event().await { match swarm2.select_next_some().await {
SwarmEvent::Behaviour(PingEvent { SwarmEvent::Behaviour(PingEvent {
result: Ok(PingSuccess::Ping { .. }), .. result: Ok(PingSuccess::Ping { .. }), ..
}) => { }) => {

View File

@ -57,6 +57,7 @@ use libp2p::core::upgrade;
use libp2p::ping::{Ping, PingConfig, PingEvent}; use libp2p::ping::{Ping, PingConfig, PingEvent};
use libp2p::plaintext; use libp2p::plaintext;
use libp2p::relay::{Relay, RelayConfig}; use libp2p::relay::{Relay, RelayConfig};
use libp2p::swarm::SwarmEvent;
use libp2p::tcp::TcpConfig; use libp2p::tcp::TcpConfig;
use libp2p::Transport; use libp2p::Transport;
use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId, Swarm}; use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId, Swarm};
@ -128,21 +129,16 @@ fn main() -> Result<(), Box<dyn Error>> {
s => panic!("Unexpected argument {:?}", s), s => panic!("Unexpected argument {:?}", s),
} }
let mut listening = false;
block_on(futures::future::poll_fn(move |cx: &mut Context<'_>| { block_on(futures::future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
match swarm.poll_next_unpin(cx) { match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event), Poll::Ready(Some(event)) => {
Poll::Ready(None) => return Poll::Ready(Ok(())), if let SwarmEvent::NewListenAddr(addr) = event {
Poll::Pending => { println!("Listening on {:?}", addr);
if !listening {
for addr in Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
} }
break;
} }
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => break,
} }
} }
Poll::Pending Poll::Pending

View File

@ -19,8 +19,8 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::executor::LocalPool; use futures::executor::LocalPool;
use futures::future::{poll_fn, FutureExt}; use futures::future::FutureExt;
use futures::stream::Stream; use futures::stream::{Stream, StreamExt};
use futures::task::Spawn; use futures::task::Spawn;
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use libp2p::NetworkBehaviour; use libp2p::NetworkBehaviour;
@ -77,13 +77,13 @@ fn src_connect_to_dst_listening_via_relay() {
pool.run_until(async { pool.run_until(async {
// Destination Node dialing Relay. // Destination Node dialing Relay.
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, relay_peer_id), SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, relay_peer_id),
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Destination Node establishing connection to Relay. // Destination Node establishing connection to Relay.
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => { SwarmEvent::ConnectionEstablished { peer_id, .. } => {
assert_eq!(peer_id, relay_peer_id); assert_eq!(peer_id, relay_peer_id);
} }
@ -92,7 +92,7 @@ fn src_connect_to_dst_listening_via_relay() {
// Destination Node reporting listen address via relay. // Destination Node reporting listen address via relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == dst_listen_addr_via_relay => break, SwarmEvent::NewListenAddr(addr) if addr == dst_listen_addr_via_relay => break,
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
@ -105,7 +105,7 @@ fn src_connect_to_dst_listening_via_relay() {
let dst = async move { let dst = async move {
// Destination Node receiving connection from Source Node via Relay. // Destination Node receiving connection from Source Node via Relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::IncomingConnection { send_back_addr, .. } => { SwarmEvent::IncomingConnection { send_back_addr, .. } => {
assert_eq!( assert_eq!(
send_back_addr, send_back_addr,
@ -120,7 +120,7 @@ fn src_connect_to_dst_listening_via_relay() {
// Destination Node establishing connection from Source Node via Relay. // Destination Node establishing connection from Source Node via Relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == src_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == src_peer_id => {
break; break;
} }
@ -131,7 +131,7 @@ fn src_connect_to_dst_listening_via_relay() {
// Destination Node waiting for Ping from Source Node via Relay. // Destination Node waiting for Ping from Source Node via Relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::ConnectionClosed { peer_id, .. } => { SwarmEvent::ConnectionClosed { peer_id, .. } => {
assert_eq!(peer_id, src_peer_id); assert_eq!(peer_id, src_peer_id);
@ -145,20 +145,20 @@ fn src_connect_to_dst_listening_via_relay() {
src_swarm.dial_addr(dst_addr_via_relay).unwrap(); src_swarm.dial_addr(dst_addr_via_relay).unwrap();
let src = async move { let src = async move {
// Source Node dialing Relay to connect to Destination Node. // Source Node dialing Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node establishing connection to Relay to connect to Destination Node. // Source Node establishing connection to Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node establishing connection to destination node via Relay. // Source Node establishing connection to destination node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
break break
} }
@ -169,7 +169,7 @@ fn src_connect_to_dst_listening_via_relay() {
// Source Node waiting for Ping from Destination Node via Relay. // Source Node waiting for Ping from Destination Node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent { SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent {
peer, peer,
result: Ok(_), result: Ok(_),
@ -220,20 +220,20 @@ fn src_connect_to_dst_not_listening_via_active_relay() {
src_swarm.dial_addr(dst_addr_via_relay).unwrap(); src_swarm.dial_addr(dst_addr_via_relay).unwrap();
pool.run_until(async move { pool.run_until(async move {
// Source Node dialing Relay to connect to Destination Node. // Source Node dialing Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node establishing connection to Relay to connect to Destination Node. // Source Node establishing connection to Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node establishing connection to destination node via Relay. // Source Node establishing connection to destination node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
break break
} }
@ -244,7 +244,7 @@ fn src_connect_to_dst_not_listening_via_active_relay() {
// Source Node waiting for Ping from Destination Node via Relay. // Source Node waiting for Ping from Destination Node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent { SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent {
peer, peer,
result: Ok(_), result: Ok(_),
@ -286,7 +286,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() {
// Wait for destination to listen via relay. // Wait for destination to listen via relay.
pool.run_until(async { pool.run_until(async {
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {} SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { .. } => {} SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break, SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
@ -301,7 +301,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() {
// Source Node establishing connection to Relay. // Source Node establishing connection to Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {
break break
@ -315,7 +315,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() {
// Source Node establishing connection to destination node via Relay. // Source Node establishing connection to destination node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
break break
} }
@ -326,7 +326,7 @@ fn src_connect_to_dst_via_established_connection_to_relay() {
// Source Node waiting for Ping from Destination Node via Relay. // Source Node waiting for Ping from Destination Node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent { SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent {
peer, peer,
result: Ok(_), result: Ok(_),
@ -368,19 +368,19 @@ fn src_try_connect_to_offline_dst() {
src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap();
pool.run_until(async move { pool.run_until(async move {
// Source Node dialing Relay to connect to Destination Node. // Source Node dialing Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node establishing connection to Relay to connect to Destination Node. // Source Node establishing connection to Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::UnreachableAddr { address, peer_id, .. } SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => if address == dst_addr_via_relay =>
{ {
@ -425,19 +425,19 @@ fn src_try_connect_to_unsupported_dst() {
src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap();
pool.run_until(async move { pool.run_until(async move {
// Source Node dialing Relay to connect to Destination Node. // Source Node dialing Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node establishing connection to Relay to connect to Destination Node. // Source Node establishing connection to Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::UnreachableAddr { address, peer_id, .. } SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => if address == dst_addr_via_relay =>
{ {
@ -475,19 +475,19 @@ fn src_try_connect_to_offline_dst_via_offline_relay() {
src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap(); src_swarm.dial_addr(dst_addr_via_relay.clone()).unwrap();
pool.run_until(async move { pool.run_until(async move {
// Source Node dialing Relay to connect to Destination Node. // Source Node dialing Relay to connect to Destination Node.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node fail to reach Relay. // Source Node fail to reach Relay.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::UnreachableAddr { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::UnreachableAddr { peer_id, .. } if peer_id == relay_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Source Node fail to reach Destination Node due to failure reaching Relay. // Source Node fail to reach Destination Node due to failure reaching Relay.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::UnreachableAddr { address, peer_id, .. } SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay => { if address == dst_addr_via_relay => {
assert_eq!(peer_id, dst_peer_id); assert_eq!(peer_id, dst_peer_id);
@ -535,14 +535,14 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
pool.run_until(async { pool.run_until(async {
// Destination Node dialing Relay. // Destination Node dialing Relay.
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, relay_peer_id), SwarmEvent::Dialing(peer_id) => assert_eq!(peer_id, relay_peer_id),
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
// Destination Node establishing connection to Relay. // Destination Node establishing connection to Relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => { SwarmEvent::ConnectionEstablished { peer_id, .. } => {
assert_eq!(peer_id, relay_peer_id); assert_eq!(peer_id, relay_peer_id);
break; break;
@ -555,7 +555,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Destination Node reporting listen address via relay. // Destination Node reporting listen address via relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break, SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
@ -568,7 +568,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Destination Node bootstrapping. // Destination Node bootstrapping.
let query_id = dst_swarm.behaviour_mut().kad.bootstrap().unwrap(); let query_id = dst_swarm.behaviour_mut().kad.bootstrap().unwrap();
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::QueryResult { SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::QueryResult {
id, id,
result: QueryResult::Bootstrap(Ok(_)), result: QueryResult::Bootstrap(Ok(_)),
@ -589,7 +589,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
let dst = async move { let dst = async move {
// Destination Node receiving connection from Source Node via Relay. // Destination Node receiving connection from Source Node via Relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::IncomingConnection { send_back_addr, .. } => { SwarmEvent::IncomingConnection { send_back_addr, .. } => {
assert_eq!(send_back_addr, Protocol::P2p(src_peer_id.into()).into()); assert_eq!(send_back_addr, Protocol::P2p(src_peer_id.into()).into());
break; break;
@ -601,7 +601,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Destination Node establishing connection from Source Node via Relay. // Destination Node establishing connection from Source Node via Relay.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == src_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == src_peer_id => {
break; break;
} }
@ -612,7 +612,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Destination Node waiting for Source Node to close connection. // Destination Node waiting for Source Node to close connection.
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == src_peer_id => { SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == src_peer_id => {
@ -634,7 +634,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Source Node establishing connection to Relay and, given that the DHT is small, Node // Source Node establishing connection to Relay and, given that the DHT is small, Node
// B. // B.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => { SwarmEvent::ConnectionEstablished { peer_id, .. } => {
if peer_id == relay_peer_id { if peer_id == relay_peer_id {
continue; continue;
@ -668,7 +668,7 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl
// Source Node waiting for Ping from Destination Node via Relay. // Source Node waiting for Ping from Destination Node via Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent { SwarmEvent::Behaviour(CombinedEvent::Ping(PingEvent {
peer, peer,
result: Ok(_), result: Ok(_),
@ -719,7 +719,7 @@ fn inactive_connection_timeout() {
// Wait for destination to listen via relay. // Wait for destination to listen via relay.
pool.run_until(async { pool.run_until(async {
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {} SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { .. } => {} SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break, SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
@ -733,7 +733,7 @@ fn inactive_connection_timeout() {
src_swarm.dial_addr(relay_addr).unwrap(); src_swarm.dial_addr(relay_addr).unwrap();
// Source Node dialing Relay. // Source Node dialing Relay.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {} SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {
break; break;
@ -745,7 +745,7 @@ fn inactive_connection_timeout() {
src_swarm.dial_addr(dst_addr_via_relay).unwrap(); src_swarm.dial_addr(dst_addr_via_relay).unwrap();
// Source Node establishing connection to destination node via Relay. // Source Node establishing connection to destination node via Relay.
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {}
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
@ -754,7 +754,7 @@ fn inactive_connection_timeout() {
// relayed connection and eventually also close the connection to Source Node given that no // relayed connection and eventually also close the connection to Source Node given that no
// connections are being relayed on the connection. // connections are being relayed on the connection.
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionClosed { peer_id, .. } => { SwarmEvent::ConnectionClosed { peer_id, .. } => {
if peer_id == relay_peer_id { if peer_id == relay_peer_id {
break; break;
@ -793,7 +793,7 @@ fn concurrent_connection_same_relay_same_dst() {
// Wait for destination to listen via relay. // Wait for destination to listen via relay.
pool.run_until(async { pool.run_until(async {
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {} SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { .. } => {} SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break, SwarmEvent::NewListenAddr(addr) if addr == dst_addr_via_relay => break,
@ -810,7 +810,7 @@ fn concurrent_connection_same_relay_same_dst() {
// Source Node establishing two connections to destination node via Relay. // Source Node establishing two connections to destination node via Relay.
let mut num_established = 0; let mut num_established = 0;
loop { loop {
match src_swarm.next_event().await { match src_swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {} SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
num_established += 1; num_established += 1;
@ -911,9 +911,9 @@ fn yield_incoming_connection_through_correct_listener() {
// Wait for destination node to establish connections to relay 1 and 2. // Wait for destination node to establish connections to relay 1 and 2.
pool.run_until(async { pool.run_until(async {
let mut established = 0; let mut established = 0u8;
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::Dialing(peer_id) SwarmEvent::Dialing(peer_id)
if peer_id == relay_1_peer_id || peer_id == relay_2_peer_id => {} if peer_id == relay_1_peer_id || peer_id == relay_2_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } SwarmEvent::ConnectionEstablished { peer_id, .. }
@ -946,7 +946,7 @@ fn yield_incoming_connection_through_correct_listener() {
let mut src_1_ping = false; let mut src_1_ping = false;
let mut src_2_ping = false; let mut src_2_ping = false;
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::IncomingConnection { .. } => {} SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished { SwarmEvent::ConnectionEstablished {
peer_id, endpoint, .. peer_id, endpoint, ..
@ -995,56 +995,55 @@ fn yield_incoming_connection_through_correct_listener() {
// Expect destination node to reject incoming connection from unknown relay given that // Expect destination node to reject incoming connection from unknown relay given that
// destination node is not listening for such connections. // destination node is not listening for such connections.
src_3_swarm.dial_addr(dst_addr_via_relay_3.clone()).unwrap(); src_3_swarm.dial_addr(dst_addr_via_relay_3.clone()).unwrap();
pool.run_until(poll_fn(|cx| { pool.run_until(async {
match dst_swarm.next_event().boxed().poll_unpin(cx) { loop {
Poll::Ready(SwarmEvent::Behaviour(CombinedEvent::Ping(_))) => {} futures::select! {
Poll::Ready(SwarmEvent::IncomingConnection { .. }) => {} event = dst_swarm.select_next_some() => match event {
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, .. }) SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
if peer_id == relay_3_peer_id => {} SwarmEvent::IncomingConnection { .. } => {}
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, .. }) SwarmEvent::ConnectionEstablished { peer_id, .. }
if peer_id == src_3_peer_id => if peer_id == relay_3_peer_id => {}
{ SwarmEvent::ConnectionEstablished { peer_id, .. }
panic!( if peer_id == src_3_peer_id =>
"Expected destination node to reject incoming connection from unknown relay \ {
without a catch-all listener", panic!(
); "Expected destination node to reject incoming connection from unknown relay \
without a catch-all listener",
);
}
e => panic!("{:?}", e),
},
event = src_3_swarm.select_next_some() => match event {
SwarmEvent::UnreachableAddr { address, peer_id, .. }
if address == dst_addr_via_relay_3 =>
{
assert_eq!(peer_id, dst_peer_id);
break;
}
SwarmEvent::Dialing { .. } => {}
SwarmEvent::ConnectionEstablished { peer_id, .. }
if peer_id == relay_3_peer_id => {}
SwarmEvent::ConnectionEstablished { peer_id, .. }
if peer_id == dst_peer_id =>
{
panic!(
"Expected destination node to reject incoming connection from unknown relay \
without a catch-all listener",
);
}
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
e => panic!("{:?}", e),
}
} }
Poll::Pending => {}
e => panic!("{:?}", e),
} }
});
match src_3_swarm.next_event().boxed().poll_unpin(cx) {
Poll::Ready(SwarmEvent::UnreachableAddr { address, peer_id, .. })
if address == dst_addr_via_relay_3 =>
{
assert_eq!(peer_id, dst_peer_id);
return Poll::Ready(());
}
Poll::Ready(SwarmEvent::Dialing { .. }) => {}
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, .. })
if peer_id == relay_3_peer_id => {}
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, .. })
if peer_id == dst_peer_id =>
{
panic!(
"Expected destination node to reject incoming connection from unknown relay \
without a catch-all listener",
);
}
Poll::Ready(SwarmEvent::Behaviour(CombinedEvent::Ping(_))) => {}
Poll::Pending => {}
e => panic!("{:?}", e),
}
Poll::Pending
}));
// Instruct destination node to listen for incoming relayed connections from unknown relay nodes. // Instruct destination node to listen for incoming relayed connections from unknown relay nodes.
dst_swarm.listen_on(Protocol::P2pCircuit.into()).unwrap(); dst_swarm.listen_on(Protocol::P2pCircuit.into()).unwrap();
// Wait for destination node to report new listen address. // Wait for destination node to report new listen address.
pool.run_until(async { pool.run_until(async {
loop { loop {
match dst_swarm.next_event().await { match dst_swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == Protocol::P2pCircuit.into() => break, SwarmEvent::NewListenAddr(addr) if addr == Protocol::P2pCircuit.into() => break,
SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {}
SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated {
@ -1061,7 +1060,7 @@ fn yield_incoming_connection_through_correct_listener() {
src_3_swarm.dial_addr(dst_addr_via_relay_3.clone()).unwrap(); src_3_swarm.dial_addr(dst_addr_via_relay_3.clone()).unwrap();
pool.run_until(async move { pool.run_until(async move {
loop { loop {
match src_3_swarm.next_event().await { match src_3_swarm.select_next_some().await {
SwarmEvent::Dialing(_) => {} SwarmEvent::Dialing(_) => {}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
break break
@ -1459,7 +1458,7 @@ fn spawn_swarm_on_pool<B: NetworkBehaviour>(pool: &LocalPool, mut swarm: Swarm<B
.spawn_obj( .spawn_obj(
async move { async move {
loop { loop {
swarm.next_event().await; swarm.next().await;
} }
} }
.boxed() .boxed()

View File

@ -33,7 +33,7 @@ use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
use libp2p_request_response::*; use libp2p_request_response::*;
use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc, executor::LocalPool, task::SpawnExt}; use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt};
use rand::{self, Rng}; use rand::{self, Rng};
use std::{io, iter}; use std::{io, iter};
use std::{collections::HashSet, num::NonZeroU16}; use std::{collections::HashSet, num::NonZeroU16};
@ -52,8 +52,12 @@ fn is_response_outbound() {
let request_id1 = swarm1.behaviour_mut().send_request(&offline_peer, ping.clone()); let request_id1 = swarm1.behaviour_mut().send_request(&offline_peer, ping.clone());
match futures::executor::block_on(swarm1.next()) { match futures::executor::block_on(swarm1.select_next_some()) {
RequestResponseEvent::OutboundFailure{peer, request_id: req_id, error: _error} => { SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure{
peer,
request_id: req_id,
error: _error
}) => {
assert_eq!(&offline_peer, &peer); assert_eq!(&offline_peer, &peer);
assert_eq!(req_id, request_id1); assert_eq!(req_id, request_id1);
}, },
@ -93,7 +97,7 @@ fn ping_protocol() {
let peer1 = async move { let peer1 = async move {
loop { loop {
match swarm1.next_event().await { match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(), SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(),
SwarmEvent::Behaviour(RequestResponseEvent::Message { SwarmEvent::Behaviour(RequestResponseEvent::Message {
peer, peer,
@ -124,11 +128,11 @@ fn ping_protocol() {
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));
loop { loop {
match swarm2.next().await { match swarm2.select_next_some().await {
RequestResponseEvent::Message { SwarmEvent::Behaviour(RequestResponseEvent::Message {
peer, peer,
message: RequestResponseMessage::Response { request_id, response } message: RequestResponseMessage::Response { request_id, response }
} => { }) => {
count += 1; count += 1;
assert_eq!(&response, &expected_pong); assert_eq!(&response, &expected_pong);
assert_eq!(&peer, &peer1_id); assert_eq!(&peer, &peer1_id);
@ -138,8 +142,10 @@ fn ping_protocol() {
} else { } else {
req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
} }
},
e => panic!("Peer2: Unexpected event: {:?}", e) }
SwarmEvent::Behaviour(e) =>panic!("Peer2: Unexpected event: {:?}", e),
_ => {}
} }
} }
}; };
@ -176,27 +182,38 @@ fn emits_inbound_connection_closed_failure() {
// Wait for swarm 1 to receive request by swarm 2. // Wait for swarm 1 to receive request by swarm 2.
let _channel = loop { let _channel = loop {
futures::select!( futures::select!(
event = swarm1.next().fuse() => match event { event = swarm1.select_next_some() => match event {
RequestResponseEvent::Message { SwarmEvent::Behaviour(RequestResponseEvent::Message {
peer, peer,
message: RequestResponseMessage::Request { request, channel, .. } message: RequestResponseMessage::Request { request, channel, .. }
} => { }) => {
assert_eq!(&request, &ping); assert_eq!(&request, &ping);
assert_eq!(&peer, &peer2_id); assert_eq!(&peer, &peer2_id);
break channel; break channel;
}, },
e => panic!("Peer1: Unexpected event: {:?}", e) SwarmEvent::Behaviour(ev) => panic!("Peer1: Unexpected event: {:?}", ev),
_ => {}
}, },
event = swarm2.next().fuse() => panic!("Peer2: Unexpected event: {:?}", event), event = swarm2.select_next_some() => {
if let SwarmEvent::Behaviour(ev) = event {
panic!("Peer2: Unexpected event: {:?}", ev);
}
}
) )
}; };
// Drop swarm 2 in order for the connection between swarm 1 and 2 to close. // Drop swarm 2 in order for the connection between swarm 1 and 2 to close.
drop(swarm2); drop(swarm2);
match swarm1.next().await { loop {
RequestResponseEvent::InboundFailure { error: InboundFailure::ConnectionClosed, ..} => {}, match swarm1.select_next_some().await {
e => panic!("Peer1: Unexpected event: {:?}", e) SwarmEvent::Behaviour(RequestResponseEvent::InboundFailure {
error: InboundFailure::ConnectionClosed,
..
}) => break,
SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e),
_ => {}
}
} }
}); });
} }
@ -234,18 +251,22 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() {
// Wait for swarm 1 to receive request by swarm 2. // Wait for swarm 1 to receive request by swarm 2.
let event = loop { let event = loop {
futures::select!( futures::select!(
event = swarm1.next().fuse() => if let RequestResponseEvent::Message { event = swarm1.select_next_some() => {
peer, if let SwarmEvent::Behaviour(RequestResponseEvent::Message {
message: RequestResponseMessage::Request { request, channel, .. } peer,
} = event { message: RequestResponseMessage::Request { request, channel, .. }
assert_eq!(&request, &ping); }) = event {
assert_eq!(&peer, &peer2_id); assert_eq!(&request, &ping);
assert_eq!(&peer, &peer2_id);
drop(channel); drop(channel);
continue; continue;
}
}, },
event = swarm2.next().fuse() => { event = swarm2.select_next_some() => {
break event; if let SwarmEvent::Behaviour(ev) = event {
break ev;
}
}, },
) )
}; };
@ -290,7 +311,7 @@ fn ping_protocol_throttled() {
let peer1 = async move { let peer1 = async move {
for i in 1 .. { for i in 1 .. {
match swarm1.next_event().await { match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(), SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(),
SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message { SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message {
peer, peer,
@ -332,15 +353,15 @@ fn ping_protocol_throttled() {
} }
blocked = true; blocked = true;
} }
match swarm2.next().await { match swarm2.select_next_some().await {
throttled::Event::ResumeSending(peer) => { SwarmEvent::Behaviour(throttled::Event::ResumeSending(peer)) => {
assert_eq!(peer, peer1_id); assert_eq!(peer, peer1_id);
blocked = false blocked = false
} }
throttled::Event::Event(RequestResponseEvent::Message { SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message {
peer, peer,
message: RequestResponseMessage::Response { request_id, response } message: RequestResponseMessage::Response { request_id, response }
}) => { })) => {
count += 1; count += 1;
assert_eq!(&response, &expected_pong); assert_eq!(&response, &expected_pong);
assert_eq!(&peer, &peer1_id); assert_eq!(&peer, &peer1_id);
@ -349,8 +370,8 @@ fn ping_protocol_throttled() {
break break
} }
} }
e => panic!("Peer2: Unexpected event: {:?}", e) SwarmEvent::Behaviour(e) =>panic!("Peer2: Unexpected event: {:?}", e),
_ => {}
} }
} }
}; };

View File

@ -262,7 +262,7 @@
//! use futures::executor::block_on; //! use futures::executor::block_on;
//! use futures::prelude::*; //! use futures::prelude::*;
//! use libp2p::ping::{Ping, PingConfig}; //! use libp2p::ping::{Ping, PingConfig};
//! use libp2p::swarm::Swarm; //! use libp2p::swarm::{Swarm, SwarmEvent};
//! use libp2p::{identity, PeerId}; //! use libp2p::{identity, PeerId};
//! use std::error::Error; //! use std::error::Error;
//! use std::task::Poll; //! use std::task::Poll;
@ -295,20 +295,15 @@
//! println!("Dialed {}", addr) //! println!("Dialed {}", addr)
//! } //! }
//! //!
//! let mut listening = false;
//! block_on(future::poll_fn(move |cx| loop { //! block_on(future::poll_fn(move |cx| loop {
//! match swarm.poll_next_unpin(cx) { //! match swarm.poll_next_unpin(cx) {
//! Poll::Ready(Some(event)) => println!("{:?}", event), //! Poll::Ready(Some(event)) => {
//! Poll::Ready(None) => return Poll::Ready(()), //! if let SwarmEvent::NewListenAddr(addr) = event {
//! Poll::Pending => { //! println!("Listening on {:?}", addr);
//! if !listening {
//! for addr in Swarm::listeners(&swarm) {
//! println!("Listening on {}", addr);
//! listening = true;
//! }
//! } //! }
//! return Poll::Pending; //! },
//! } //! Poll::Ready(None) => return Poll::Ready(()),
//! Poll::Pending => return Poll::Pending
//! } //! }
//! })); //! }));
//! //!

View File

@ -18,3 +18,4 @@ quote = "1.0"
[dev-dependencies] [dev-dependencies]
libp2p = { path = "../" } libp2p = { path = "../" }
futures = "0.3.1"

View File

@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p::swarm::SwarmEvent;
use libp2p_swarm_derive::*; use libp2p_swarm_derive::*;
/// Small utility to check that a type implements `NetworkBehaviour`. /// Small utility to check that a type implements `NetworkBehaviour`.
@ -299,9 +301,12 @@ fn event_process_false() {
// check that the event is bubbled up all the way to swarm // check that the event is bubbled up all the way to swarm
let _ = async { let _ = async {
match _swarm.next().await { loop {
BehaviourOutEvent::Ping(_) => {}, match _swarm.select_next_some().await {
BehaviourOutEvent::Identify(_) => {}, SwarmEvent::Behaviour(BehaviourOutEvent::Ping(_)) => break,
SwarmEvent::Behaviour(BehaviourOutEvent::Identify(_)) => break,
_ => {}
}
} }
}; };
} }

View File

@ -2,6 +2,21 @@
- Update dependencies. - Update dependencies.
- Drive `ExpandedSwarm` via `Stream` trait only.
- Change `Stream` implementation of `ExpandedSwarm` to return all
`SwarmEvents` instead of only the `NetworkBehaviour`'s events.
- Remove `ExpandedSwarm::next_event`. Users can use `<ExpandedSwarm as
StreamExt>::next` instead.
- Remove `ExpandedSwarm::next`. Users can use `<ExpandedSwarm as
StreamExt>::filter_map` instead.
See [PR 2100] for details.
[PR 2100]: https://github.com/libp2p/rust-libp2p/pull/2100
# 0.29.0 [2021-04-13] # 0.29.0 [2021-04-13]
- Remove `Deref` and `DerefMut` implementations previously dereferencing to the - Remove `Deref` and `DerefMut` implementations previously dereferencing to the

View File

@ -256,6 +256,9 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
} }
/// Contains the state of the network, plus the way it should behave. /// Contains the state of the network, plus the way it should behave.
///
/// Note: Needs to be polled via `<ExpandedSwarm as Stream>` in order to make
/// progress.
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler> pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where where
THandler: IntoProtocolsHandler, THandler: IntoProtocolsHandler,
@ -475,25 +478,6 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
&mut self.behaviour &mut self.behaviour
} }
/// Returns the next event that happens in the `Swarm`.
///
/// Includes events from the `NetworkBehaviour` but also events about the connections status.
pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
}
/// Returns the next event produced by the [`NetworkBehaviour`].
pub async fn next(&mut self) -> TBehaviour::OutEvent {
future::poll_fn(move |cx| {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(event);
}
}
}).await
}
/// Internal function used by everything event-related. /// Internal function used by everything event-related.
/// ///
/// Polls the `Swarm` for the next event. /// Polls the `Swarm` for the next event.
@ -841,27 +825,33 @@ where
}) })
} }
impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for /// Stream of events returned by [`ExpandedSwarm`].
///
/// Includes events from the [`NetworkBehaviour`] as well as events about
/// connection and listener status. See [`SwarmEvent`] for details.
///
/// Note: This stream is infinite and it is guaranteed that
/// [`Stream::poll_next`] will never return `Poll::Ready(None)`.
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler> ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>, where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static, THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static, TInEvent: Send + 'static,
TOutEvent: Send + 'static, TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>, THandler::Handler:
ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
{ {
type Item = TBehaviour::OutEvent; type Item = SwarmEvent<TBehaviour::OutEvent, THandleErr>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { self.as_mut()
let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx)); .poll_next_event(cx)
if let SwarmEvent::Behaviour(event) = event { .map(Some)
return Poll::Ready(Some(event));
}
}
} }
} }
/// the stream of behaviour events never terminates, so we can implement fused for it /// The stream of swarm events never terminates, so we can implement fused for it.
impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler> ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>, where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,