Add Swarm::next_extended (#1374)

* Add Swarm::next_extended

* Fix ipfs-kad example

* Fix tests

* Renames
This commit is contained in:
Pierre Krieger
2020-01-07 11:57:00 +01:00
committed by GitHub
parent 56ca671376
commit 84b6a7d04d
5 changed files with 239 additions and 155 deletions

View File

@ -24,7 +24,6 @@
//! peer ID will be generated randomly. //! peer ID will be generated randomly.
use async_std::task; use async_std::task;
use futures::prelude::*;
use libp2p::{ use libp2p::{
Swarm, Swarm,
PeerId, PeerId,
@ -90,7 +89,8 @@ fn main() -> Result<(), Box<dyn Error>> {
// Kick it off! // Kick it off!
task::block_on(async move { task::block_on(async move {
while let Some(event) = swarm.try_next().await? { loop {
let event = swarm.next().await;
if let KademliaEvent::GetClosestPeersResult(result) = event { if let KademliaEvent::GetClosestPeersResult(result) = event {
match result { match result {
Ok(ok) => Ok(ok) =>

View File

@ -254,20 +254,18 @@ pub enum IdentifyEvent {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{Identify, IdentifyEvent}; use crate::{Identify, IdentifyEvent};
use futures::prelude::*; use futures::{prelude::*, pin_mut};
use libp2p_core::{ use libp2p_core::{
identity, identity,
PeerId, PeerId,
muxing::StreamMuxer, muxing::StreamMuxer,
Multiaddr,
Transport, Transport,
upgrade upgrade
}; };
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use libp2p_secio::SecioConfig; use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm; use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_mplex::MplexConfig; use libp2p_mplex::MplexConfig;
use rand::{Rng, thread_rng};
use std::{fmt, io}; use std::{fmt, io};
fn transport() -> (identity::PublicKey, impl Transport< fn transport() -> (identity::PublicKey, impl Transport<
@ -303,13 +301,19 @@ mod tests {
(swarm, pubkey) (swarm, pubkey)
}; };
let addr: Multiaddr = { Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let port = thread_rng().gen_range(49152, std::u16::MAX);
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()
};
Swarm::listen_on(&mut swarm1, addr.clone()).unwrap(); let listen_addr = async_std::task::block_on(async {
Swarm::dial_addr(&mut swarm2, addr.clone()).unwrap(); loop {
let swarm1_fut = swarm1.next_event();
pin_mut!(swarm1_fut);
match swarm1_fut.await {
SwarmEvent::NewListenAddr(addr) => return addr,
_ => {}
}
}
});
Swarm::dial_addr(&mut swarm2, listen_addr).unwrap();
// nb. Either swarm may receive the `Identified` event first, upon which // nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by // it will permit the connection to be closed, as defined by
@ -317,8 +321,13 @@ mod tests {
// either `Identified` event arrives correctly. // either `Identified` event arrives correctly.
async_std::task::block_on(async move { async_std::task::block_on(async move {
loop { loop {
match future::select(swarm1.next(), swarm2.next()).await.factor_second().0 { let swarm1_fut = swarm1.next();
future::Either::Left(Some(Ok(IdentifyEvent::Received { info, .. }))) => { pin_mut!(swarm1_fut);
let swarm2_fut = swarm2.next();
pin_mut!(swarm2_fut);
match future::select(swarm1_fut, swarm2_fut).await.factor_second().0 {
future::Either::Left(IdentifyEvent::Received { info, .. }) => {
assert_eq!(info.public_key, pubkey2); assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c"); assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d"); assert_eq!(info.agent_version, "d");
@ -326,7 +335,7 @@ mod tests {
assert!(info.listen_addrs.is_empty()); assert!(info.listen_addrs.is_empty());
return; return;
} }
future::Either::Right(Some(Ok(IdentifyEvent::Received { info, .. }))) => { future::Either::Right(IdentifyEvent::Received { info, .. }) => {
assert_eq!(info.public_key, pubkey1); assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a"); assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b"); assert_eq!(info.agent_version, "b");

View File

@ -128,7 +128,7 @@ fn bootstrap() {
for (i, swarm) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::BootstrapResult(Ok(ok))))) => { Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => {
assert_eq!(i, 0); assert_eq!(i, 0);
assert_eq!(ok.peer, swarm_ids[0]); assert_eq!(ok.peer, swarm_ids[0]);
let known = swarm.kbuckets.iter() let known = swarm.kbuckets.iter()
@ -138,7 +138,7 @@ fn bootstrap() {
return Poll::Ready(()) return Poll::Ready(())
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -186,7 +186,7 @@ fn query_iter() {
for (i, swarm) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm_ids[i], expected_swarm_id);
assert_eq!(swarm.queries.size(), 0); assert_eq!(swarm.queries.size(), 0);
@ -196,7 +196,7 @@ fn query_iter() {
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -234,13 +234,13 @@ fn unresponsive_not_returned_direct() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 0); assert_eq!(ok.peers.len(), 0);
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -278,14 +278,14 @@ fn unresponsive_not_returned_indirect() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 1); assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id); assert_eq!(ok.peers[0], first_peer_id);
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -314,7 +314,7 @@ fn get_record_not_found() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Err(e))))) => { Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => {
if let GetRecordError::NotFound { key, closest_peers, } = e { if let GetRecordError::NotFound { key, closest_peers, } = e {
assert_eq!(key, target_key); assert_eq!(key, target_key);
assert_eq!(closest_peers.len(), 2); assert_eq!(closest_peers.len(), 2);
@ -326,7 +326,7 @@ fn get_record_not_found() {
} }
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -375,8 +375,8 @@ fn put_record() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::PutRecordResult(res)))) | Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
Poll::Ready(Some(Ok(KademliaEvent::RepublishRecordResult(res)))) => { Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => {
match res { match res {
Err(e) => panic!(e), Err(e) => panic!(e),
Ok(ok) => { Ok(ok) => {
@ -387,7 +387,7 @@ fn put_record() {
} }
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -474,13 +474,13 @@ fn get_value() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => { Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), 1); assert_eq!(ok.records.len(), 1);
assert_eq!(ok.records.first(), Some(&record)); assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -513,13 +513,13 @@ fn get_value_many() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => { Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
assert_eq!(ok.records.len(), num_results); assert_eq!(ok.records.len(), num_results);
assert_eq!(ok.records.first(), Some(&record)); assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -561,8 +561,8 @@ fn add_provider() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(Ok(KademliaEvent::StartProvidingResult(res)))) | Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) |
Poll::Ready(Some(Ok(KademliaEvent::RepublishProviderResult(res)))) => { Poll::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => {
match res { match res {
Err(e) => panic!(e), Err(e) => panic!(e),
Ok(ok) => { Ok(ok) => {
@ -572,7 +572,7 @@ fn add_provider() {
} }
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(Ok(_))) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
Poll::Pending => break, Poll::Pending => break,
} }
@ -669,7 +669,7 @@ fn exceed_jobs_max_queries() {
for _ in 0 .. num { for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly. // There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) { if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) {
if let Ok(KademliaEvent::BootstrapResult(r)) = e { if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error") assert!(r.is_ok(), "Unexpected error")
} else { } else {
panic!("Unexpected event: {:?}", e) panic!("Unexpected event: {:?}", e)

View File

@ -60,7 +60,7 @@ fn ping() {
} }
loop { loop {
match swarm1.next().await.unwrap().unwrap() { match swarm1.next().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid1.clone(), peer, rtt) return (pid1.clone(), peer, rtt)
}, },
@ -74,7 +74,7 @@ fn ping() {
Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap(); Swarm::dial_addr(&mut swarm2, rx.next().await.unwrap()).unwrap();
loop { loop {
match swarm2.next().await.unwrap().unwrap() { match swarm2.next().await {
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
return (pid2.clone(), peer, rtt) return (pid2.clone(), peer, rtt)
}, },

View File

@ -93,7 +93,7 @@ use libp2p_core::{
}; };
use registry::{Addresses, AddressIntoIter}; use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}}; use std::{error, fmt, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet; use std::collections::HashSet;
/// Contains the state of the network, plus the way it should behave. /// Contains the state of the network, plus the way it should behave.
@ -107,6 +107,33 @@ pub type Swarm<TTransport, TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
TConnInfo, TConnInfo,
>; >;
/// Event generated by the `Swarm`.
#[derive(Debug)]
pub enum SwarmEvent<TBvEv> {
/// Event generated by the `NetworkBehaviour`.
Behaviour(TBvEv),
/// We are now connected to the given peer.
Connected(PeerId),
/// We are now disconnected from the given peer.
Disconnected(PeerId),
/// One of our listeners has reported a new local listening address.
NewListenAddr(Multiaddr),
/// One of our listeners has reported the expiration of a listening address.
ExpiredListenAddr(Multiaddr),
/// Tried to dial an address but it ended up being unreachaable.
UnreachableAddr {
/// `PeerId` that we were trying to reach. `None` if we don't know in advance which peer
/// we were trying to reach.
peer_id: Option<PeerId>,
/// Address that we failed to reach.
address: Multiaddr,
/// Error that has been encountered.
error: Box<dyn error::Error + Send>,
},
/// Startng to try to reach the given peer.
StartConnect(PeerId),
}
/// Contains the state of the network, plus the way it should behave. /// Contains the state of the network, plus the way it should behave.
pub struct ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId> pub struct ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId>
where where
@ -305,6 +332,166 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) { pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.remove(&peer_id); me.banned_peers.remove(&peer_id);
} }
/// Returns the next event that happens in the `Swarm`.
///
/// Includes events from the `NetworkBehaviour` but also events about the connections status.
pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent> {
future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
}
/// Returns the next event produced by the [`NetworkBehaviour`].
pub async fn next(&mut self) -> TBehaviour::OutEvent {
future::poll_fn(move |cx| {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(event);
}
}
}).await
}
/// Internal function used by everything event-related.
///
/// Polls the `Swarm` for the next event.
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<SwarmEvent<TBehaviour::OutEvent>>
{
// We use a `this` variable because the compiler can't mutably borrow multiple times
// across a `Deref`.
let this = &mut *self;
loop {
let mut network_not_ready = false;
match this.network.poll(cx) {
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::NodeEvent { conn_info, event }) => {
this.behaviour.inject_node_event(conn_info.peer_id().clone(), event);
},
Poll::Ready(NetworkEvent::Connected { conn_info, endpoint }) => {
if this.banned_peers.contains(conn_info.peer_id()) {
this.network.peer(conn_info.peer_id().clone())
.into_connected()
.expect("the Network just notified us that we were connected; QED")
.close();
} else {
this.behaviour.inject_connected(conn_info.peer_id().clone(), endpoint);
return Poll::Ready(SwarmEvent::Connected(conn_info.peer_id().clone()));
}
},
Poll::Ready(NetworkEvent::NodeClosed { conn_info, endpoint, .. }) => {
this.behaviour.inject_disconnected(conn_info.peer_id(), endpoint);
return Poll::Ready(SwarmEvent::Disconnected(conn_info.peer_id().clone()));
},
Poll::Ready(NetworkEvent::Replaced { new_info, closed_endpoint, endpoint, .. }) => {
this.behaviour.inject_replaced(new_info.peer_id().clone(), closed_endpoint, endpoint);
},
Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => {
let handler = this.behaviour.new_handler();
incoming.accept(handler.into_node_handler_builder());
},
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listen_addr, .. }) => {
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, .. }) =>
this.behaviour.inject_listener_closed(listener_id),
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) =>
this.behaviour.inject_listener_error(listener_id, &error),
Poll::Ready(NetworkEvent::IncomingConnectionError { .. }) => {},
Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, new_state }) => {
this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if let network::PeerState::NotConnected = new_state {
this.behaviour.inject_dial_failure(&peer_id);
}
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id: Some(peer_id.clone()),
address: multiaddr,
error: Box::new(error),
});
},
Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id: None,
address: multiaddr,
error: Box::new(error),
});
},
}
// Try to deliver pending event.
if let Some((id, pending)) = this.send_event_to_complete.take() {
if let Some(mut peer) = this.network.peer(id.clone()).into_connected() {
match peer.poll_ready_event(cx) {
Poll::Ready(()) => peer.start_send_event(pending),
Poll::Pending => {
this.send_event_to_complete = Some((id, pending));
return Poll::Pending
},
}
}
}
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &mut this.network.local_peer_id(),
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if network_not_ready => return Poll::Pending,
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
ExpandedSwarm::dial(&mut *this, peer_id.clone());
return Poll::Ready(SwarmEvent::StartConnect(peer_id))
}
},
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
if let Poll::Ready(()) = peer.poll_ready_event(cx) {
peer.start_send_event(event);
} else {
debug_assert!(this.send_event_to_complete.is_none());
this.send_event_to_complete = Some((peer_id, event));
return Poll::Pending;
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| *a != addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr);
}
},
}
}
}
} }
impl<TTransport, TBehaviour, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Stream for impl<TTransport, TBehaviour, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Stream for
@ -340,125 +527,13 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
<NodeHandlerWrapper<<THandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary <NodeHandlerWrapper<<THandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static, TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{ {
type Item = Result<TBehaviour::OutEvent, io::Error>; type Item = TBehaviour::OutEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// We use a `this` variable because the compiler can't mutably borrow multiple times
// across a `Deref`.
let this = &mut *self;
loop { loop {
let mut network_not_ready = false; let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
if let SwarmEvent::Behaviour(event) = event {
match this.network.poll(cx) { return Poll::Ready(Some(event));
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::NodeEvent { conn_info, event }) => {
this.behaviour.inject_node_event(conn_info.peer_id().clone(), event);
},
Poll::Ready(NetworkEvent::Connected { conn_info, endpoint }) => {
if this.banned_peers.contains(conn_info.peer_id()) {
this.network.peer(conn_info.peer_id().clone())
.into_connected()
.expect("the Network just notified us that we were connected; QED")
.close();
} else {
this.behaviour.inject_connected(conn_info.peer_id().clone(), endpoint);
}
},
Poll::Ready(NetworkEvent::NodeClosed { conn_info, endpoint, .. }) => {
this.behaviour.inject_disconnected(conn_info.peer_id(), endpoint);
},
Poll::Ready(NetworkEvent::Replaced { new_info, closed_endpoint, endpoint, .. }) => {
this.behaviour.inject_replaced(new_info.peer_id().clone(), closed_endpoint, endpoint);
},
Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => {
let handler = this.behaviour.new_handler();
incoming.accept(handler.into_node_handler_builder());
},
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listen_addr, .. }) => {
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, .. }) =>
this.behaviour.inject_listener_closed(listener_id),
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) =>
this.behaviour.inject_listener_error(listener_id, &error),
Poll::Ready(NetworkEvent::IncomingConnectionError { .. }) => {},
Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, new_state }) => {
this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if let network::PeerState::NotConnected = new_state {
this.behaviour.inject_dial_failure(&peer_id);
}
},
Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
},
}
// Try to deliver pending event.
if let Some((id, pending)) = this.send_event_to_complete.take() {
if let Some(mut peer) = this.network.peer(id.clone()).into_connected() {
match peer.poll_ready_event(cx) {
Poll::Ready(()) => peer.start_send_event(pending),
Poll::Pending => {
this.send_event_to_complete = Some((id, pending));
return Poll::Pending
},
}
}
}
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &mut this.network.local_peer_id(),
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if network_not_ready => return Poll::Pending,
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(Some(Ok(event)))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
ExpandedSwarm::dial(&mut *this, peer_id);
}
},
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
if let Poll::Ready(()) = peer.poll_ready_event(cx) {
peer.start_send_event(event);
} else {
debug_assert!(this.send_event_to_complete.is_none());
this.send_event_to_complete = Some((peer_id, event));
return Poll::Pending;
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| *a != addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr)
}
},
} }
} }
} }