Tests for RawSwarm (#602)

* Add unit tests for core::nodes::NodeStream

* Move DummyMuxer to core/tests

* Address grumbles

* Impl Debug for SubstreamRef<P>

* Add test for poll()

* Don't need to open a substream

* pretty printer test

* More tests for NodeStream poll()

* ListenerStream unit tests: transport() and listeners()

* Tests for nodes/listeners.rs

* Add a few tests to help illustrate the "drowning" behaviour of busy listeners

* Tests for HandledNode

* Address grumbles

* Remove non-project specific stuff

* Address grumbles

* Prefer freestanding function

* Untangle the code for old shutdown test from the new tests
Add HandlerState and use it in TestBuilder
Shorter test names

* WIP – tests pass

* Use a newtype to lighten up the function signatures a bit
Test NotReady case

* Cleanup Event enum
Track events as they reach the Handler
Describe complex test logic

* Assert on the event trace

* More tests for poll()

* Switch to using usize as the OutboundOpenInfo so we can assert on event contents
More tests for poll()

* whitespace

* whitespace and spelling

* WIP tests for handled_node_tasks:Task

* wip

* Move Handler related code to dummy_handler

* Sort out the events going to/from node

* WIP tests for poll

* Add a TestBuilder for NodeTask tests
More NodeTask tests

* Fixes broken test after upstream changes

* Clarify the behaviour of is_shutting_down

* Fix broken test

* Test for task exit when muxers in- and outbound are closed

* Add question about impossible Async::NotReady

* Fix tests after recent changes on master

* Upstream changes

* Tests for HandledNodesTasks

* Add test for HandledNodesTasks poll

* Test we reach all nodes and then closed all nodes

* Test event emission by HandledNodesTasks

* Test starting CollectionStream

* Test that we can reach a peer after a successful connection

* Assert collection contains what we expect

* Test handler events get to the collectionstream

* Remaining tests for CollectionStream

* Run tests on single threaded runtime to avoid intermittent failures

* Remove call to shutdown – not needed

* No need to specify tokio version

* Change the DummyTransport Output type to match RawSwarm

* First few tests for RawSwarm
Whitespace: spaces

* Dummy impl of `dial()`

* Typos/grammar in docs

* Impl Debug for RawSwarmEvent and Peer
Test dialing peer without peer id

* Test num_incoming_negotiated

* A few more tests

* whitespace

* Add derive(Debug) for RawSwarm, ReachAttempts, PeerPendingConnect, PeerNotConnected
Fix intermittent test failures by polling repeatedly when getting NotReady
Add more tests

* Outline remaining work

* Test more error conditions

* wip

* Test error conditions

* Remove debug statements

* typo

* whitespace

* Use PeerId::random

* Uneeded dependency

* Use PeerId::random

* Somewhat less artificial nat_traversal test

* Use the IPv6 "black hole" prefix as an "unreachable" multiaddr

* Use a boolean on DummyTransport to make dial attempts fail

* No funny stuff for nat_traversal
This commit is contained in:
David
2018-11-26 08:57:19 +01:00
committed by Pierre Krieger
parent 1b05132d6a
commit 9fe7d56410
5 changed files with 641 additions and 62 deletions

View File

@ -52,7 +52,7 @@ use std::collections::VecDeque;
/// // Ask the `listeners` to start listening on the given multiaddress.
/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
///
/// // You can retreive the list of active listeners with `listeners()`.
/// // You can retrieve the list of active listeners with `listeners()`.
/// println!("Listening on: {:?}", listeners.listeners().collect::<Vec<_>>());
///
/// // The `listeners` will now generate events when polled.
@ -282,35 +282,37 @@ mod tests {
use std::io;
use futures::{future::{self}, stream};
use tests::dummy_transport::{DummyTransport, ListenerState};
use tests::dummy_muxer::DummyMuxer;
use PeerId;
fn set_listener_state(ls: &mut ListenersStream<DummyTransport>, idx: usize, state: ListenerState) {
let l = &mut ls.listeners[idx];
l.listener =
match state {
ListenerState::Error => {
let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() );
Box::new(stream)
}
ListenerState::Ok(async) => {
match async {
Async::NotReady => {
let stream = stream::poll_fn(|| Ok(Async::NotReady));
Box::new(stream)
}
Async::Ready(Some(n)) => {
let addr = l.address.clone();
let stream = stream::iter_ok(n..)
.map(move |stream| (future::ok(stream), addr.clone()));
Box::new(stream)
}
Async::Ready(None) => {
let stream = stream::empty();
Box::new(stream)
}
}
}
};
}
fn set_listener_state(ls: &mut ListenersStream<DummyTransport>, idx: usize, state: ListenerState) {
let l = &mut ls.listeners[idx];
l.listener =
match state {
ListenerState::Error => {
let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() );
Box::new(stream)
}
ListenerState::Ok(async) => {
match async {
Async::NotReady => {
let stream = stream::poll_fn(|| Ok(Async::NotReady));
Box::new(stream)
}
Async::Ready(Some(tup)) => {
let addr = l.address.clone();
let stream = stream::poll_fn(move || Ok( Async::Ready(Some(tup.clone())) ))
.map(move |stream| (future::ok(stream), addr.clone()));
Box::new(stream)
}
Async::Ready(None) => {
let stream = stream::empty();
Box::new(stream)
}
}
}
};
}
#[test]
fn incoming_event() {
@ -344,8 +346,9 @@ mod tests {
#[test]
fn listener_stream_returns_transport() {
let t = DummyTransport::new();
let t_clone = t.clone();
let ls = ListenersStream::new(t);
assert_eq!(ls.transport(), &t);
assert_eq!(ls.transport(), &t_clone);
}
#[test]
@ -382,10 +385,13 @@ mod tests {
}
#[test]
fn listener_stream_poll_with_ready_listeners_yields_upgrade() {
let mut transport = DummyTransport::new();
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(transport);
fn listener_stream_poll_with_ready_listeners_is_ready() {
let mut t = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
let expected_output = (peer_id.clone(), muxer.clone());
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) ))));
let mut ls = ListenersStream::new(t);
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.2/tcp/4321".parse::<Multiaddr>().expect("bad multiaddr");
@ -398,7 +404,7 @@ mod tests {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/4321");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_eq!(tup, 1)
assert_eq!(tup, expected_output)
});
})
});
@ -407,7 +413,7 @@ mod tests {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_eq!(tup, 1)
assert_eq!(tup, expected_output)
});
})
});
@ -417,8 +423,7 @@ mod tests {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
// Second time we poll this Listener, so we get `2` from the transport Stream
assert_eq!(tup, 2)
assert_eq!(tup, expected_output)
});
})
});
@ -441,7 +446,9 @@ mod tests {
#[test]
fn listener_stream_poll_with_erroring_listener_emits_closed_event() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) ))));
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
@ -455,8 +462,10 @@ mod tests {
#[test]
fn listener_stream_poll_chatty_listeners_each_get_their_turn() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let mut ls = ListenersStream::new(t);
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id.clone(), muxer) )))); let mut ls = ListenersStream::new(t);
// Create 4 Listeners
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
@ -470,6 +479,7 @@ mod tests {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
// Doing it again yields them in the same order
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
@ -485,6 +495,7 @@ mod tests {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
for n in (0..3).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
@ -493,7 +504,10 @@ mod tests {
// Turning the last listener back on means we now have 4 "good"
// listeners, and each get their turn.
set_listener_state(&mut ls, 3, ListenerState::Ok(Async::Ready(Some(2))));
set_listener_state(
&mut ls, 3,
ListenerState::Ok(Async::Ready(Some( (peer_id, DummyMuxer::new()) )))
);
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
@ -504,7 +518,9 @@ mod tests {
#[test]
fn listener_stream_poll_processes_listeners_in_turn() {
let mut t = DummyTransport::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1))));
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) ))));
let mut ls = ListenersStream::new(t);
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");

View File

@ -39,11 +39,13 @@ use crate::{
use fnv::FnvHashMap;
use futures::{prelude::*, future};
use std::{
fmt,
collections::hash_map::{Entry, OccupiedEntry},
io::{Error as IoError, ErrorKind as IoErrorKind}
};
/// Implementation of `Stream` that handles the nodes.
#[derive(Debug)]
pub struct RawSwarm<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
@ -59,6 +61,7 @@ where
reach_attempts: ReachAttempts,
}
#[derive(Debug)]
struct ReachAttempts {
/// Attempts to reach a peer.
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
@ -192,6 +195,83 @@ where
},
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TOutEvent: fmt::Debug,
TTrans: Transport,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
RawSwarmEvent::ListenerClosed { ref listen_addr, listener: _, ref result } => {
f.debug_struct("ListenerClosed")
.field("listen_addr", listen_addr)
.field("result", result)
.finish()
}
RawSwarmEvent::IncomingConnection( IncomingConnectionEvent { ref listen_addr, ref send_back_addr, .. } ) => {
f.debug_struct("IncomingConnection")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
.finish()
}
RawSwarmEvent::IncomingConnectionError { ref listen_addr, ref send_back_addr, ref error} => {
f.debug_struct("IncomingConnectionError")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
.field("error", error)
.finish()
}
RawSwarmEvent::Connected { ref peer_id, ref endpoint } => {
f.debug_struct("Connected")
.field("peer_id", peer_id)
.field("endpoint", endpoint)
.finish()
}
RawSwarmEvent::Replaced { ref peer_id, ref closed_endpoint, ref endpoint } => {
f.debug_struct("Replaced")
.field("peer_id", peer_id)
.field("closed_endpoint", closed_endpoint)
.field("endpoint", endpoint)
.finish()
}
RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint } => {
f.debug_struct("NodeClosed")
.field("peer_id", peer_id)
.field("endpoint", endpoint)
.finish()
}
RawSwarmEvent::NodeError { ref peer_id, ref endpoint, ref error } => {
f.debug_struct("NodeError")
.field("peer_id", peer_id)
.field("endpoint", endpoint)
.field("error", error)
.finish()
}
RawSwarmEvent::DialError { ref remain_addrs_attempt, ref peer_id, ref multiaddr, ref error } => {
f.debug_struct("DialError")
.field("remain_addrs_attempt", remain_addrs_attempt)
.field("peer_id", peer_id)
.field("multiaddr", multiaddr)
.field("error", error)
.finish()
}
RawSwarmEvent::UnknownPeerDialError { ref multiaddr, ref error, .. } => {
f.debug_struct("UnknownPeerDialError")
.field("multiaddr", multiaddr)
.field("error", error)
.finish()
}
RawSwarmEvent::NodeEvent { ref peer_id, ref event } => {
f.debug_struct("NodeEvent")
.field("peer_id", peer_id)
.field("event", event)
.finish()
}
}
}
}
/// A new connection arrived on a listener.
pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a>
where TTrans: Transport
@ -844,6 +924,32 @@ where
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler>),
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Peer::Connected( PeerConnected { peer: _, ref peer_id, ref connected_points }) => {
f.debug_struct("Connected")
.field("peer_id", peer_id)
.field("connected_points", connected_points)
.finish()
}
Peer::PendingConnect( PeerPendingConnect { ref attempt, .. } ) => {
f.debug_struct("PendingConnect")
.field("attempt", attempt)
.finish()
}
Peer::NotConnected(PeerNotConnected { ref peer_id, .. }) => {
f.debug_struct("NotConnected")
.field("peer_id", peer_id)
.finish()
}
}
}
}
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
@ -1008,6 +1114,7 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
}
/// Access to a peer we are attempting to connect to.
#[derive(Debug)]
pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> {
attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler>,
@ -1055,6 +1162,7 @@ impl<'a, TInEvent, TOutEvent, THandler> PeerPendingConnect<'a, TInEvent, TOutEve
}
/// Access to a peer we're not connected to.
#[derive(Debug)]
pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a>
where
TTrans: Transport,
@ -1124,3 +1232,426 @@ where
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::runtime::{Builder, Runtime};
use tests::dummy_transport::DummyTransport;
use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent};
use tests::dummy_transport::ListenerState;
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use nodes::NodeHandlerEvent;
#[test]
fn query_transport() {
let transport = DummyTransport::new();
let transport2 = transport.clone();
let raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport);
assert_eq!(raw_swarm.transport(), &transport2);
}
#[test]
fn starts_listening() {
let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = addr.clone();
assert!(raw_swarm.listen_on(addr).is_ok());
let listeners = raw_swarm.listeners().collect::<Vec<&Multiaddr>>();
assert_eq!(listeners.len(), 1);
assert_eq!(listeners[0], &addr2);
}
#[test]
fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() {
// the DummyTransport nat_traversal increments the port number by one for Ip4 addresses
let transport = DummyTransport::new();
let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
// An unrelated outside address is returned as-is, no transform
let outside_addr1 = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.2/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let outside_addr2 = "/ip4/127.0.0.2/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
raw_swarm.listen_on(addr1).unwrap();
raw_swarm.listen_on(addr2).unwrap();
let natted = raw_swarm
.nat_traversal(&outside_addr1)
.map(|a| a.to_string())
.collect::<Vec<_>>();
assert!(natted.is_empty());
let natted = raw_swarm
.nat_traversal(&outside_addr2)
.map(|a| a.to_string())
.collect::<Vec<_>>();
assert_eq!(natted, vec!["/ip4/127.0.0.2/tcp/1234"])
}
#[test]
fn successful_dial_reaches_a_node() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let dial_res = swarm.dial(addr, Handler::default());
assert!(dial_res.is_ok());
// Poll the swarm until we get a `NodeReached` then assert on the peer:
// it's there and it's connected.
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
let mut peer_id : Option<PeerId> = None;
// Drive forward until we're Connected
while peer_id.is_none() {
let swarm_fut = swarm.clone();
peer_id = rt.block_on(future::poll_fn(move || -> Poll<Option<PeerId>, ()> {
let mut swarm = swarm_fut.lock();
let poll_res = swarm.poll();
match poll_res {
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
_ => Ok(Async::Ready(None))
}
})).expect("tokio works");
}
let mut swarm = swarm.lock();
let peer = swarm.peer(peer_id.unwrap());
assert_matches!(peer, Peer::Connected(PeerConnected{..}));
}
#[test]
fn num_incoming_negotiated() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
// Set up listener to see an incoming connection
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer)))));
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport);
swarm.listen_on("/memory".parse().unwrap()).unwrap();
// no incoming yet
assert_eq!(swarm.num_incoming_negotiated(), 0);
let mut rt = Runtime::new().unwrap();
let swarm = Arc::new(Mutex::new(swarm));
let swarm_fut = swarm.clone();
let fut = future::poll_fn(move || -> Poll<_, ()> {
let mut swarm_fut = swarm_fut.lock();
assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
incoming.accept(Handler::default());
});
Ok(Async::Ready(()))
});
rt.block_on(fut).expect("tokio works");
let swarm = swarm.lock();
// Now there's an incoming connection
assert_eq!(swarm.num_incoming_negotiated(), 1);
}
#[test]
fn broadcasted_events_reach_active_nodes() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Opened);
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let mut handler = Handler::default();
handler.next_states = vec![HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),];
let dial_result = swarm.dial(addr, handler);
assert!(dial_result.is_ok());
swarm.broadcast_event(&InEvent::NextState);
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
let mut peer_id : Option<PeerId> = None;
while peer_id.is_none() {
let swarm_fut = swarm.clone();
peer_id = rt.block_on(future::poll_fn(move || -> Poll<Option<PeerId>, ()> {
let mut swarm = swarm_fut.lock();
let poll_res = swarm.poll();
match poll_res {
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
_ => Ok(Async::Ready(None))
}
})).expect("tokio works");
}
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
match swarm.poll() {
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => {
// The event we sent reached the node and triggered sending the out event we told it to return
assert_matches!(inner_event, OutEvent::Custom("from handler 1"));
});
Ok(Async::Ready(false))
},
_ => Ok(Async::Ready(true))
}
})).expect("tokio works");
}
}
#[test]
fn querying_for_pending_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let peer_id = PeerId::random();
let peer = swarm.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
let addr = "/memory".parse().expect("bad multiaddr");
let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default());
assert!(pending_peer.is_ok());
assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } ));
}
#[test]
fn querying_for_unknown_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
let peer_id = PeerId::random();
let peer = swarm.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => {
assert_eq!(node_peer_id, peer_id);
});
}
#[test]
fn querying_for_connected_peer() {
let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new());
// Dial a node
let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr");
swarm.dial(addr, Handler::default()).expect("dialing works");
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
// Drive it forward until we connect; extract the new PeerId.
let mut peer_id : Option<PeerId> = None;
while peer_id.is_none() {
let swarm_fut = swarm.clone();
peer_id = rt.block_on(future::poll_fn(move || -> Poll<Option<PeerId>, ()> {
let mut swarm = swarm_fut.lock();
let poll_res = swarm.poll();
match poll_res {
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
_ => Ok(Async::Ready(None))
}
})).expect("tokio works");
}
// We're connected.
let mut swarm = swarm.lock();
let peer = swarm.peer(peer_id.unwrap());
assert_matches!(peer, Peer::Connected( PeerConnected { .. } ));
}
#[test]
fn poll_with_closed_listener() {
let mut transport = DummyTransport::new();
// Set up listener to be closed
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None)));
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport);
swarm.listen_on("/memory".parse().unwrap()).unwrap();
let mut rt = Runtime::new().unwrap();
let swarm = Arc::new(Mutex::new(swarm));
let swarm_fut = swarm.clone();
let fut = future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::ListenerClosed { .. } ));
Ok(Async::Ready(()))
});
rt.block_on(fut).expect("tokio works");
}
#[test]
fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() {
let mut transport = DummyTransport::new();
transport.make_dial_fail();
let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport);
let addr = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let handler = Handler::default();
let dial_result = swarm.dial(addr, handler);
assert!(dial_result.is_ok());
let swarm = Arc::new(Mutex::new(swarm));
let mut rt = Runtime::new().unwrap();
// Drive it forward until we hear back from the node.
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::UnknownPeerDialError { .. } );
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
}
#[test]
fn known_peer_that_is_unreachable_yields_dial_error() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
transport.make_dial_fail();
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));
{
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
let addr = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default());
assert!(pending_peer.is_ok());
assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } ));
}
let mut rt = Runtime::new().unwrap();
// Drive it forward until we hear back from the node.
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
let peer_id = peer_id.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
let failed_peer_id = assert_matches!(
event,
RawSwarmEvent::DialError { remain_addrs_attempt: _, peer_id: failed_peer_id, .. } => failed_peer_id
);
assert_eq!(peer_id, failed_peer_id);
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
}
#[test]
fn yields_node_error_when_there_is_an_error_after_successful_connect() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));
{
// Set up an outgoing connection with a PeerId we know
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
let addr = "/unix/reachable".parse().expect("bad multiaddr");
let mut handler = Handler::default();
// Force an error
handler.next_states = vec![ HandlerState::Err ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
}
// Ensure we run on a single thread
let mut rt = Builder::new().core_threads(1).build().unwrap();
// Drive it forward until we connect to the node.
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
// Push the Handler into an error state on the next poll
swarm.broadcast_event(&InEvent::NextState);
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::Connected { .. });
// We're connected, we can move on
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
// Poll again. It is going to be a NodeError because of how the
// handler's next state was set up.
let swarm_fut = swarm.clone();
let expected_peer_id = peer_id.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => {
assert_eq!(peer_id, expected_peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
#[test]
fn yields_node_closed_when_the_node_closes_after_successful_connect() {
let mut transport = DummyTransport::new();
let peer_id = PeerId::random();
transport.set_next_peer_id(&peer_id);
let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport)));
{
// Set up an outgoing connection with a PeerId we know
let swarm1 = swarm.clone();
let mut swarm1 = swarm1.lock();
let peer = swarm1.peer(peer_id.clone());
let addr = "/unix/reachable".parse().expect("bad multiaddr");
let mut handler = Handler::default();
// Force handler to close
handler.next_states = vec![ HandlerState::Ready(None) ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
}
// Ensure we run on a single thread
let mut rt = Builder::new().core_threads(1).build().unwrap();
// Drive it forward until we connect to the node.
let mut keep_polling = true;
while keep_polling {
let swarm_fut = swarm.clone();
keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
// Push the Handler into the closed state on the next poll
swarm.broadcast_event(&InEvent::NextState);
match swarm.poll() {
Async::NotReady => Ok(Async::Ready(true)),
Async::Ready(event) => {
assert_matches!(event, RawSwarmEvent::Connected { .. });
// We're connected, we can move on
Ok(Async::Ready(false))
},
}
})).expect("tokio works");
}
// Poll again. It is going to be a NodeClosed because of how the
// handler's next state was set up.
let swarm_fut = swarm.clone();
let expected_peer_id = peer_id.clone();
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut swarm = swarm_fut.lock();
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => {
assert_eq!(peer_id, expected_peer_id);
});
Ok(Async::Ready(()))
})).expect("tokio works");
}
}

View File

@ -42,14 +42,14 @@ pub enum DummyConnectionState {
Closed, // use this to trigger the Async::Ready(None) code path
Opened, // use this to trigger the Async::Ready(Some(_)) code path
}
#[derive(Debug, Clone)]
#[derive(Debug, PartialEq, Clone)]
struct DummyConnection {
state: DummyConnectionState,
}
/// `DummyMuxer` implements `StreamMuxer` and methods to control its behaviour when used in tests
#[derive(Debug, Clone)]
pub struct DummyMuxer {
#[derive(Debug, PartialEq, Clone)]
pub struct DummyMuxer{
in_connection: DummyConnection,
out_connection: DummyConnection,
}

View File

@ -28,33 +28,47 @@ use futures::{
stream,
};
use std::io;
use {Multiaddr, Transport};
use {Multiaddr, PeerId, Transport};
use tests::dummy_muxer::DummyMuxer;
#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Clone)]
pub(crate) enum ListenerState {
/// The `usize` indexes items produced by the listener
Ok(Async<Option<usize>>),
Error,
Ok(Async<Option<(PeerId, DummyMuxer)>>),
Error
}
#[derive(Debug, PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct DummyTransport {
/// The current state of Listeners.
listener_state: ListenerState,
/// The next peer returned from dial().
next_peer_id: Option<PeerId>,
/// When true, all dial attempts return error.
dial_should_fail: bool,
}
impl DummyTransport {
pub(crate) fn new() -> Self {
DummyTransport {
listener_state: ListenerState::Ok(Async::NotReady),
next_peer_id: None,
dial_should_fail: false,
}
}
pub(crate) fn set_initial_listener_state(&mut self, state: ListenerState) {
self.listener_state = state;
}
pub(crate) fn set_next_peer_id(&mut self, peer_id: &PeerId) {
self.next_peer_id = Some(peer_id.clone());
}
pub(crate) fn make_dial_fail(&mut self) {
self.dial_should_fail = true;
}
}
impl Transport for DummyTransport {
type Output = usize;
type Listener =
Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = io::Error> + Send>;
type Output = (PeerId, DummyMuxer);
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, io::Error>;
type Dial = Box<Future<Item = Self::Output, Error = io::Error> + Send>;
@ -71,8 +85,8 @@ impl Transport for DummyTransport {
let stream = stream::poll_fn(|| Ok(Async::NotReady)).map(tupelize);
(Box::new(stream), addr2)
}
Async::Ready(Some(n)) => {
let stream = stream::iter_ok(n..).map(tupelize);
Async::Ready(Some(tup)) => {
let stream = stream::poll_fn(move || Ok( Async::Ready(Some(tup.clone()) ))).map(tupelize);
(Box::new(stream), addr2)
}
Async::Ready(None) => {
@ -89,10 +103,28 @@ impl Transport for DummyTransport {
where
Self: Sized,
{
unimplemented!();
let peer_id = if let Some(peer_id) = self.next_peer_id {
peer_id
} else {
PeerId::random()
};
let fut =
if self.dial_should_fail {
let err_string = format!("unreachable host error, peer={:?}", peer_id);
future::err(io::Error::new(io::ErrorKind::Other, err_string))
} else {
future::ok((peer_id, DummyMuxer::new()))
};
Ok(Box::new(fut))
}
fn nat_traversal(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
unimplemented!();
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if server == observed {
Some(observed.clone())
} else {
None
}
}
}

View File

@ -94,7 +94,7 @@ pub trait Transport {
where
Self: Sized;
/// Dial to the given multi-addr.
/// Dial the given multi-addr.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
@ -114,7 +114,7 @@ pub trait Transport {
/// as is.
///
/// Returns `None` if nothing can be determined. This happens if this trait implementation
/// doesn't recognize the protocols, or if `server` and `observed` are related.
/// doesn't recognize the protocols, or if `server` and `observed` are not related.
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
/// Turns this `Transport` into an abstract boxed transport.