diff --git a/core/src/nodes/listeners.rs b/core/src/nodes/listeners.rs index 66ea0e89..da6f717c 100644 --- a/core/src/nodes/listeners.rs +++ b/core/src/nodes/listeners.rs @@ -52,7 +52,7 @@ use std::collections::VecDeque; /// // Ask the `listeners` to start listening on the given multiaddress. /// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); /// -/// // You can retreive the list of active listeners with `listeners()`. +/// // You can retrieve the list of active listeners with `listeners()`. /// println!("Listening on: {:?}", listeners.listeners().collect::>()); /// /// // The `listeners` will now generate events when polled. @@ -282,35 +282,37 @@ mod tests { use std::io; use futures::{future::{self}, stream}; use tests::dummy_transport::{DummyTransport, ListenerState}; + use tests::dummy_muxer::DummyMuxer; + use PeerId; - fn set_listener_state(ls: &mut ListenersStream, idx: usize, state: ListenerState) { - let l = &mut ls.listeners[idx]; - l.listener = - match state { - ListenerState::Error => { - let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() ); - Box::new(stream) - } - ListenerState::Ok(async) => { - match async { - Async::NotReady => { - let stream = stream::poll_fn(|| Ok(Async::NotReady)); - Box::new(stream) - } - Async::Ready(Some(n)) => { - let addr = l.address.clone(); - let stream = stream::iter_ok(n..) - .map(move |stream| (future::ok(stream), addr.clone())); - Box::new(stream) - } - Async::Ready(None) => { - let stream = stream::empty(); - Box::new(stream) - } - } - } - }; - } + fn set_listener_state(ls: &mut ListenersStream, idx: usize, state: ListenerState) { + let l = &mut ls.listeners[idx]; + l.listener = + match state { + ListenerState::Error => { + let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() ); + Box::new(stream) + } + ListenerState::Ok(async) => { + match async { + Async::NotReady => { + let stream = stream::poll_fn(|| Ok(Async::NotReady)); + Box::new(stream) + } + Async::Ready(Some(tup)) => { + let addr = l.address.clone(); + let stream = stream::poll_fn(move || Ok( Async::Ready(Some(tup.clone())) )) + .map(move |stream| (future::ok(stream), addr.clone())); + Box::new(stream) + } + Async::Ready(None) => { + let stream = stream::empty(); + Box::new(stream) + } + } + } + }; + } #[test] fn incoming_event() { @@ -344,8 +346,9 @@ mod tests { #[test] fn listener_stream_returns_transport() { let t = DummyTransport::new(); + let t_clone = t.clone(); let ls = ListenersStream::new(t); - assert_eq!(ls.transport(), &t); + assert_eq!(ls.transport(), &t_clone); } #[test] @@ -382,10 +385,13 @@ mod tests { } #[test] - fn listener_stream_poll_with_ready_listeners_yields_upgrade() { - let mut transport = DummyTransport::new(); - transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); - let mut ls = ListenersStream::new(transport); + fn listener_stream_poll_with_ready_listeners_is_ready() { + let mut t = DummyTransport::new(); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + let expected_output = (peer_id.clone(), muxer.clone()); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) )))); + let mut ls = ListenersStream::new(t); let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); let addr2 = "/ip4/127.0.0.2/tcp/4321".parse::().expect("bad multiaddr"); @@ -398,7 +404,7 @@ mod tests { assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => { assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/4321"); assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => { - assert_eq!(tup, 1) + assert_eq!(tup, expected_output) }); }) }); @@ -407,7 +413,7 @@ mod tests { assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => { assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234"); assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => { - assert_eq!(tup, 1) + assert_eq!(tup, expected_output) }); }) }); @@ -417,8 +423,7 @@ mod tests { assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => { assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234"); assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => { - // Second time we poll this Listener, so we get `2` from the transport Stream - assert_eq!(tup, 2) + assert_eq!(tup, expected_output) }); }) }); @@ -441,7 +446,9 @@ mod tests { #[test] fn listener_stream_poll_with_erroring_listener_emits_closed_event() { let mut t = DummyTransport::new(); - t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) )))); let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); let mut ls = ListenersStream::new(t); ls.listen_on(addr).expect("listen_on failed"); @@ -455,8 +462,10 @@ mod tests { #[test] fn listener_stream_poll_chatty_listeners_each_get_their_turn() { let mut t = DummyTransport::new(); - t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); - let mut ls = ListenersStream::new(t); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id.clone(), muxer) )))); let mut ls = ListenersStream::new(t); + // Create 4 Listeners for n in 0..4 { let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::().expect("bad multiaddr"); @@ -470,6 +479,7 @@ mod tests { assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n)) }) } + // Doing it again yields them in the same order for n in (0..4).rev() { assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => { @@ -485,6 +495,7 @@ mod tests { assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n)) }) } + for n in (0..3).rev() { assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => { assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n)) @@ -493,7 +504,10 @@ mod tests { // Turning the last listener back on means we now have 4 "good" // listeners, and each get their turn. - set_listener_state(&mut ls, 3, ListenerState::Ok(Async::Ready(Some(2)))); + set_listener_state( + &mut ls, 3, + ListenerState::Ok(Async::Ready(Some( (peer_id, DummyMuxer::new()) ))) + ); for n in (0..4).rev() { assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => { assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n)) @@ -504,7 +518,9 @@ mod tests { #[test] fn listener_stream_poll_processes_listeners_in_turn() { let mut t = DummyTransport::new(); - t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(1)))); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) )))); let mut ls = ListenersStream::new(t); for n in 0..4 { let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::().expect("bad multiaddr"); diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 8d02a9ec..545f3b09 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -39,11 +39,13 @@ use crate::{ use fnv::FnvHashMap; use futures::{prelude::*, future}; use std::{ + fmt, collections::hash_map::{Entry, OccupiedEntry}, io::{Error as IoError, ErrorKind as IoErrorKind} }; /// Implementation of `Stream` that handles the nodes. +#[derive(Debug)] pub struct RawSwarm where TTrans: Transport, @@ -59,6 +61,7 @@ where reach_attempts: ReachAttempts, } +#[derive(Debug)] struct ReachAttempts { /// Attempts to reach a peer. out_reach_attempts: FnvHashMap, @@ -192,6 +195,83 @@ where }, } +impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler> +where + TOutEvent: fmt::Debug, + TTrans: Transport, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + RawSwarmEvent::ListenerClosed { ref listen_addr, listener: _, ref result } => { + f.debug_struct("ListenerClosed") + .field("listen_addr", listen_addr) + .field("result", result) + .finish() + } + RawSwarmEvent::IncomingConnection( IncomingConnectionEvent { ref listen_addr, ref send_back_addr, .. } ) => { + f.debug_struct("IncomingConnection") + .field("listen_addr", listen_addr) + .field("send_back_addr", send_back_addr) + .finish() + } + RawSwarmEvent::IncomingConnectionError { ref listen_addr, ref send_back_addr, ref error} => { + f.debug_struct("IncomingConnectionError") + .field("listen_addr", listen_addr) + .field("send_back_addr", send_back_addr) + .field("error", error) + .finish() + } + RawSwarmEvent::Connected { ref peer_id, ref endpoint } => { + f.debug_struct("Connected") + .field("peer_id", peer_id) + .field("endpoint", endpoint) + .finish() + } + RawSwarmEvent::Replaced { ref peer_id, ref closed_endpoint, ref endpoint } => { + f.debug_struct("Replaced") + .field("peer_id", peer_id) + .field("closed_endpoint", closed_endpoint) + .field("endpoint", endpoint) + .finish() + } + RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint } => { + f.debug_struct("NodeClosed") + .field("peer_id", peer_id) + .field("endpoint", endpoint) + .finish() + } + RawSwarmEvent::NodeError { ref peer_id, ref endpoint, ref error } => { + f.debug_struct("NodeError") + .field("peer_id", peer_id) + .field("endpoint", endpoint) + .field("error", error) + .finish() + } + RawSwarmEvent::DialError { ref remain_addrs_attempt, ref peer_id, ref multiaddr, ref error } => { + f.debug_struct("DialError") + .field("remain_addrs_attempt", remain_addrs_attempt) + .field("peer_id", peer_id) + .field("multiaddr", multiaddr) + .field("error", error) + .finish() + } + RawSwarmEvent::UnknownPeerDialError { ref multiaddr, ref error, .. } => { + f.debug_struct("UnknownPeerDialError") + .field("multiaddr", multiaddr) + .field("error", error) + .finish() + } + RawSwarmEvent::NodeEvent { ref peer_id, ref event } => { + f.debug_struct("NodeEvent") + .field("peer_id", peer_id) + .field("event", event) + .finish() + } + } + } +} + + /// A new connection arrived on a listener. pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> where TTrans: Transport @@ -844,6 +924,32 @@ where NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler>), } +impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for Peer<'a, TTrans, TInEvent, TOutEvent, THandler> +where + TTrans: Transport, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + match *self { + Peer::Connected( PeerConnected { peer: _, ref peer_id, ref connected_points }) => { + f.debug_struct("Connected") + .field("peer_id", peer_id) + .field("connected_points", connected_points) + .finish() + } + Peer::PendingConnect( PeerPendingConnect { ref attempt, .. } ) => { + f.debug_struct("PendingConnect") + .field("attempt", attempt) + .finish() + } + Peer::NotConnected(PeerNotConnected { ref peer_id, .. }) => { + f.debug_struct("NotConnected") + .field("peer_id", peer_id) + .finish() + } + } + } +} + // TODO: add other similar methods that wrap to the ones of `PeerNotConnected` impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> @@ -1008,6 +1114,7 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> { } /// Access to a peer we are attempting to connect to. +#[derive(Debug)] pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> { attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>, active_nodes: &'a mut CollectionStream, @@ -1055,6 +1162,7 @@ impl<'a, TInEvent, TOutEvent, THandler> PeerPendingConnect<'a, TInEvent, TOutEve } /// Access to a peer we're not connected to. +#[derive(Debug)] pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> where TTrans: Transport, @@ -1124,3 +1232,426 @@ where }) } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use parking_lot::Mutex; + use tokio::runtime::{Builder, Runtime}; + use tests::dummy_transport::DummyTransport; + use tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent}; + use tests::dummy_transport::ListenerState; + use tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; + use nodes::NodeHandlerEvent; + + #[test] + fn query_transport() { + let transport = DummyTransport::new(); + let transport2 = transport.clone(); + let raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport); + assert_eq!(raw_swarm.transport(), &transport2); + } + + #[test] + fn starts_listening() { + let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let addr2 = addr.clone(); + assert!(raw_swarm.listen_on(addr).is_ok()); + let listeners = raw_swarm.listeners().collect::>(); + assert_eq!(listeners.len(), 1); + assert_eq!(listeners[0], &addr2); + } + + #[test] + fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() { + // the DummyTransport nat_traversal increments the port number by one for Ip4 addresses + let transport = DummyTransport::new(); + let mut raw_swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + // An unrelated outside address is returned as-is, no transform + let outside_addr1 = "/memory".parse::().expect("bad multiaddr"); + + let addr2 = "/ip4/127.0.0.2/tcp/1234".parse::().expect("bad multiaddr"); + let outside_addr2 = "/ip4/127.0.0.2/tcp/1234".parse::().expect("bad multiaddr"); + + raw_swarm.listen_on(addr1).unwrap(); + raw_swarm.listen_on(addr2).unwrap(); + + let natted = raw_swarm + .nat_traversal(&outside_addr1) + .map(|a| a.to_string()) + .collect::>(); + + assert!(natted.is_empty()); + + let natted = raw_swarm + .nat_traversal(&outside_addr2) + .map(|a| a.to_string()) + .collect::>(); + + assert_eq!(natted, vec!["/ip4/127.0.0.2/tcp/1234"]) + } + + #[test] + fn successful_dial_reaches_a_node() { + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let dial_res = swarm.dial(addr, Handler::default()); + assert!(dial_res.is_ok()); + + // Poll the swarm until we get a `NodeReached` then assert on the peer: + // it's there and it's connected. + let swarm = Arc::new(Mutex::new(swarm)); + + let mut rt = Runtime::new().unwrap(); + let mut peer_id : Option = None; + // Drive forward until we're Connected + while peer_id.is_none() { + let swarm_fut = swarm.clone(); + peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { + let mut swarm = swarm_fut.lock(); + let poll_res = swarm.poll(); + match poll_res { + Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + _ => Ok(Async::Ready(None)) + } + })).expect("tokio works"); + } + + let mut swarm = swarm.lock(); + let peer = swarm.peer(peer_id.unwrap()); + assert_matches!(peer, Peer::Connected(PeerConnected{..})); + } + + #[test] + fn num_incoming_negotiated() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + let muxer = DummyMuxer::new(); + + // Set up listener to see an incoming connection + transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer))))); + + let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport); + swarm.listen_on("/memory".parse().unwrap()).unwrap(); + + // no incoming yet + assert_eq!(swarm.num_incoming_negotiated(), 0); + + let mut rt = Runtime::new().unwrap(); + let swarm = Arc::new(Mutex::new(swarm)); + let swarm_fut = swarm.clone(); + let fut = future::poll_fn(move || -> Poll<_, ()> { + let mut swarm_fut = swarm_fut.lock(); + assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { + incoming.accept(Handler::default()); + }); + + Ok(Async::Ready(())) + }); + rt.block_on(fut).expect("tokio works"); + let swarm = swarm.lock(); + // Now there's an incoming connection + assert_eq!(swarm.num_incoming_negotiated(), 1); + } + + #[test] + fn broadcasted_events_reach_active_nodes() { + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let mut muxer = DummyMuxer::new(); + muxer.set_inbound_connection_state(DummyConnectionState::Pending); + muxer.set_outbound_connection_state(DummyConnectionState::Opened); + let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); + let mut handler = Handler::default(); + handler.next_states = vec![HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),]; + let dial_result = swarm.dial(addr, handler); + assert!(dial_result.is_ok()); + + swarm.broadcast_event(&InEvent::NextState); + let swarm = Arc::new(Mutex::new(swarm)); + let mut rt = Runtime::new().unwrap(); + let mut peer_id : Option = None; + while peer_id.is_none() { + let swarm_fut = swarm.clone(); + peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { + let mut swarm = swarm_fut.lock(); + let poll_res = swarm.poll(); + match poll_res { + Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + _ => Ok(Async::Ready(None)) + } + })).expect("tokio works"); + } + + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + match swarm.poll() { + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => { + // The event we sent reached the node and triggered sending the out event we told it to return + assert_matches!(inner_event, OutEvent::Custom("from handler 1")); + }); + Ok(Async::Ready(false)) + }, + _ => Ok(Async::Ready(true)) + } + })).expect("tokio works"); + } + } + + #[test] + fn querying_for_pending_peer() { + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let peer_id = PeerId::random(); + let peer = swarm.peer(peer_id.clone()); + assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); + let addr = "/memory".parse().expect("bad multiaddr"); + let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default()); + assert!(pending_peer.is_ok()); + assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } )); + } + + #[test] + fn querying_for_unknown_peer() { + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + let peer_id = PeerId::random(); + let peer = swarm.peer(peer_id.clone()); + assert_matches!(peer, Peer::NotConnected( PeerNotConnected { nodes: _, peer_id: node_peer_id }) => { + assert_eq!(node_peer_id, peer_id); + }); + } + + #[test] + fn querying_for_connected_peer() { + let mut swarm = RawSwarm::<_, _, _, Handler>::new(DummyTransport::new()); + + // Dial a node + let addr = "/ip4/127.0.0.1/tcp/1234".parse().expect("bad multiaddr"); + swarm.dial(addr, Handler::default()).expect("dialing works"); + + let swarm = Arc::new(Mutex::new(swarm)); + let mut rt = Runtime::new().unwrap(); + // Drive it forward until we connect; extract the new PeerId. + let mut peer_id : Option = None; + while peer_id.is_none() { + let swarm_fut = swarm.clone(); + peer_id = rt.block_on(future::poll_fn(move || -> Poll, ()> { + let mut swarm = swarm_fut.lock(); + let poll_res = swarm.poll(); + match poll_res { + Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))), + _ => Ok(Async::Ready(None)) + } + })).expect("tokio works"); + } + + // We're connected. + let mut swarm = swarm.lock(); + let peer = swarm.peer(peer_id.unwrap()); + assert_matches!(peer, Peer::Connected( PeerConnected { .. } )); + } + + #[test] + fn poll_with_closed_listener() { + let mut transport = DummyTransport::new(); + // Set up listener to be closed + transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(None))); + + let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport); + swarm.listen_on("/memory".parse().unwrap()).unwrap(); + + let mut rt = Runtime::new().unwrap(); + let swarm = Arc::new(Mutex::new(swarm)); + + let swarm_fut = swarm.clone(); + let fut = future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::ListenerClosed { .. } )); + Ok(Async::Ready(())) + }); + rt.block_on(fut).expect("tokio works"); + } + + #[test] + fn unknown_peer_that_is_unreachable_yields_unknown_peer_dial_error() { + let mut transport = DummyTransport::new(); + transport.make_dial_fail(); + let mut swarm = RawSwarm::<_, _, _, Handler>::new(transport); + let addr = "/memory".parse::().expect("bad multiaddr"); + let handler = Handler::default(); + let dial_result = swarm.dial(addr, handler); + assert!(dial_result.is_ok()); + + let swarm = Arc::new(Mutex::new(swarm)); + let mut rt = Runtime::new().unwrap(); + // Drive it forward until we hear back from the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::UnknownPeerDialError { .. } ); + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + } + + #[test] + fn known_peer_that_is_unreachable_yields_dial_error() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + transport.set_next_peer_id(&peer_id); + transport.make_dial_fail(); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + + { + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. })); + let addr = "/memory".parse::().expect("bad multiaddr"); + let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default()); + assert!(pending_peer.is_ok()); + assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } )); + } + let mut rt = Runtime::new().unwrap(); + // Drive it forward until we hear back from the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + let peer_id = peer_id.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + let failed_peer_id = assert_matches!( + event, + RawSwarmEvent::DialError { remain_addrs_attempt: _, peer_id: failed_peer_id, .. } => failed_peer_id + ); + assert_eq!(peer_id, failed_peer_id); + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + } + + #[test] + fn yields_node_error_when_there_is_an_error_after_successful_connect() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + transport.set_next_peer_id(&peer_id); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + + { + // Set up an outgoing connection with a PeerId we know + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + let addr = "/unix/reachable".parse().expect("bad multiaddr"); + let mut handler = Handler::default(); + // Force an error + handler.next_states = vec![ HandlerState::Err ]; + peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); + } + + // Ensure we run on a single thread + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Drive it forward until we connect to the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + // Push the Handler into an error state on the next poll + swarm.broadcast_event(&InEvent::NextState); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::Connected { .. }); + // We're connected, we can move on + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + + // Poll again. It is going to be a NodeError because of how the + // handler's next state was set up. + let swarm_fut = swarm.clone(); + let expected_peer_id = peer_id.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => { + assert_eq!(peer_id, expected_peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } + + #[test] + fn yields_node_closed_when_the_node_closes_after_successful_connect() { + let mut transport = DummyTransport::new(); + let peer_id = PeerId::random(); + transport.set_next_peer_id(&peer_id); + let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler>::new(transport))); + + { + // Set up an outgoing connection with a PeerId we know + let swarm1 = swarm.clone(); + let mut swarm1 = swarm1.lock(); + let peer = swarm1.peer(peer_id.clone()); + let addr = "/unix/reachable".parse().expect("bad multiaddr"); + let mut handler = Handler::default(); + // Force handler to close + handler.next_states = vec![ HandlerState::Ready(None) ]; + peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer"); + } + + // Ensure we run on a single thread + let mut rt = Builder::new().core_threads(1).build().unwrap(); + + // Drive it forward until we connect to the node. + let mut keep_polling = true; + while keep_polling { + let swarm_fut = swarm.clone(); + keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + // Push the Handler into the closed state on the next poll + swarm.broadcast_event(&InEvent::NextState); + match swarm.poll() { + Async::NotReady => Ok(Async::Ready(true)), + Async::Ready(event) => { + assert_matches!(event, RawSwarmEvent::Connected { .. }); + // We're connected, we can move on + Ok(Async::Ready(false)) + }, + } + })).expect("tokio works"); + } + + // Poll again. It is going to be a NodeClosed because of how the + // handler's next state was set up. + let swarm_fut = swarm.clone(); + let expected_peer_id = peer_id.clone(); + rt.block_on(future::poll_fn(move || -> Poll<_, ()> { + let mut swarm = swarm_fut.lock(); + assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => { + assert_eq!(peer_id, expected_peer_id); + }); + Ok(Async::Ready(())) + })).expect("tokio works"); + } +} diff --git a/core/src/tests/dummy_muxer.rs b/core/src/tests/dummy_muxer.rs index 7e5a41b7..75d90a0b 100644 --- a/core/src/tests/dummy_muxer.rs +++ b/core/src/tests/dummy_muxer.rs @@ -42,14 +42,14 @@ pub enum DummyConnectionState { Closed, // use this to trigger the Async::Ready(None) code path Opened, // use this to trigger the Async::Ready(Some(_)) code path } -#[derive(Debug, Clone)] +#[derive(Debug, PartialEq, Clone)] struct DummyConnection { state: DummyConnectionState, } /// `DummyMuxer` implements `StreamMuxer` and methods to control its behaviour when used in tests -#[derive(Debug, Clone)] -pub struct DummyMuxer { +#[derive(Debug, PartialEq, Clone)] +pub struct DummyMuxer{ in_connection: DummyConnection, out_connection: DummyConnection, } diff --git a/core/src/tests/dummy_transport.rs b/core/src/tests/dummy_transport.rs index d69fe691..7c2a927f 100644 --- a/core/src/tests/dummy_transport.rs +++ b/core/src/tests/dummy_transport.rs @@ -28,33 +28,47 @@ use futures::{ stream, }; use std::io; -use {Multiaddr, Transport}; +use {Multiaddr, PeerId, Transport}; +use tests::dummy_muxer::DummyMuxer; -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Clone)] pub(crate) enum ListenerState { - /// The `usize` indexes items produced by the listener - Ok(Async>), - Error, + Ok(Async>), + Error } -#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Clone)] pub(crate) struct DummyTransport { + /// The current state of Listeners. listener_state: ListenerState, + /// The next peer returned from dial(). + next_peer_id: Option, + /// When true, all dial attempts return error. + dial_should_fail: bool, } impl DummyTransport { pub(crate) fn new() -> Self { DummyTransport { listener_state: ListenerState::Ok(Async::NotReady), + next_peer_id: None, + dial_should_fail: false, } } pub(crate) fn set_initial_listener_state(&mut self, state: ListenerState) { self.listener_state = state; } + + pub(crate) fn set_next_peer_id(&mut self, peer_id: &PeerId) { + self.next_peer_id = Some(peer_id.clone()); + } + + pub(crate) fn make_dial_fail(&mut self) { + self.dial_should_fail = true; + } } impl Transport for DummyTransport { - type Output = usize; - type Listener = - Box + Send>; + type Output = (PeerId, DummyMuxer); + type Listener = Box + Send>; type ListenerUpgrade = FutureResult; type Dial = Box + Send>; @@ -71,8 +85,8 @@ impl Transport for DummyTransport { let stream = stream::poll_fn(|| Ok(Async::NotReady)).map(tupelize); (Box::new(stream), addr2) } - Async::Ready(Some(n)) => { - let stream = stream::iter_ok(n..).map(tupelize); + Async::Ready(Some(tup)) => { + let stream = stream::poll_fn(move || Ok( Async::Ready(Some(tup.clone()) ))).map(tupelize); (Box::new(stream), addr2) } Async::Ready(None) => { @@ -89,10 +103,28 @@ impl Transport for DummyTransport { where Self: Sized, { - unimplemented!(); + let peer_id = if let Some(peer_id) = self.next_peer_id { + peer_id + } else { + PeerId::random() + }; + + let fut = + if self.dial_should_fail { + let err_string = format!("unreachable host error, peer={:?}", peer_id); + future::err(io::Error::new(io::ErrorKind::Other, err_string)) + } else { + future::ok((peer_id, DummyMuxer::new())) + }; + + Ok(Box::new(fut)) } - fn nat_traversal(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { - unimplemented!(); + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if server == observed { + Some(observed.clone()) + } else { + None + } } } diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 9890b0d1..04dbff8a 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -94,7 +94,7 @@ pub trait Transport { where Self: Sized; - /// Dial to the given multi-addr. + /// Dial the given multi-addr. /// /// Returns either a future which may resolve to a connection, or gives back the multiaddress. fn dial(self, addr: Multiaddr) -> Result @@ -114,7 +114,7 @@ pub trait Transport { /// as is. /// /// Returns `None` if nothing can be determined. This happens if this trait implementation - /// doesn't recognize the protocols, or if `server` and `observed` are related. + /// doesn't recognize the protocols, or if `server` and `observed` are not related. fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; /// Turns this `Transport` into an abstract boxed transport.