mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-18 12:31:22 +00:00
Tests for HandledNode (#546)
* Add unit tests for core::nodes::NodeStream * Move DummyMuxer to core/tests * Address grumbles * Impl Debug for SubstreamRef<P> * Add test for poll() * Don't need to open a substream * pretty printer test * More tests for NodeStream poll() * ListenerStream unit tests: transport() and listeners() * Tests for nodes/listeners.rs * Add a few tests to help illustrate the "drowning" behaviour of busy listeners * Tests for HandledNode * Address grumbles * Remove non-project specific stuff * Address grumbles * Prefer freestanding function * Untangle the code for old shutdown test from the new tests Add HandlerState and use it in TestBuilder Shorter test names * WIP – tests pass * Use a newtype to lighten up the function signatures a bit Test NotReady case * Cleanup Event enum Track events as they reach the Handler Describe complex test logic * Assert on the event trace * More tests for poll() * Switch to using usize as the OutboundOpenInfo so we can assert on event contents More tests for poll() * whitespace * Move Handler related code to dummy_handler * Fixes broken test after upstream changes * Clarify the behaviour of is_shutting_down * Fix broken test * Fix tests after recent changes on master * no tabs * whitespace * rustfmt * Add public HandledNode.handler() method that returns a ref to the NodeHandler Don't use private members in tests * Add HandledNode.handler_mut that returns a mutable ref to the NodeHandler * Remove debugging stmts * Fix parse error
This commit is contained in:
@ -65,7 +65,7 @@ pub trait NodeHandler {
|
|||||||
/// Injects an event coming from the outside into the handler.
|
/// Injects an event coming from the outside into the handler.
|
||||||
fn inject_event(&mut self, event: Self::InEvent);
|
fn inject_event(&mut self, event: Self::InEvent);
|
||||||
|
|
||||||
/// Indicates that the node that it should shut down. After that, it is expected that `poll()`
|
/// Indicates to the node that it should shut down. After that, it is expected that `poll()`
|
||||||
/// returns `Ready(None)` as soon as possible.
|
/// returns `Ready(None)` as soon as possible.
|
||||||
///
|
///
|
||||||
/// This method allows an implementation to perform a graceful shutdown of the substreams, and
|
/// This method allows an implementation to perform a graceful shutdown of the substreams, and
|
||||||
@ -176,6 +176,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the `NodeHandler`
|
||||||
|
pub fn handler(&self) -> &THandler{
|
||||||
|
&self.handler
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mutable reference to the `NodeHandler`
|
||||||
|
pub fn handler_mut(&mut self) -> &mut THandler{
|
||||||
|
&mut self.handler
|
||||||
|
}
|
||||||
|
|
||||||
/// Injects an event to the handler.
|
/// Injects an event to the handler.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn inject_event(&mut self, event: THandler::InEvent) {
|
pub fn inject_event(&mut self, event: THandler::InEvent) {
|
||||||
@ -286,7 +296,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -294,44 +303,81 @@ where
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use muxing::{StreamMuxer, Shutdown};
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
use tokio::runtime::current_thread;
|
use tokio::runtime::current_thread;
|
||||||
|
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
|
||||||
|
use tests::dummy_handler::{Handler, HandlerState, Event};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
// TODO: move somewhere? this could be useful as a dummy
|
// Concrete `HandledNode`
|
||||||
struct InstaCloseMuxer;
|
type TestHandledNode = HandledNode<DummyMuxer, Handler>;
|
||||||
impl StreamMuxer for InstaCloseMuxer {
|
|
||||||
type Substream = ();
|
struct TestBuilder {
|
||||||
type OutboundSubstream = ();
|
muxer: DummyMuxer,
|
||||||
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> { Ok(Async::Ready(None)) }
|
handler: Handler,
|
||||||
fn open_outbound(&self) -> Self::OutboundSubstream { () }
|
want_open_substream: bool,
|
||||||
fn poll_outbound(&self, _: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> { Ok(Async::Ready(None)) }
|
substream_user_data: usize,
|
||||||
fn destroy_outbound(&self, _: Self::OutboundSubstream) {}
|
}
|
||||||
fn read_substream(&self, _: &mut Self::Substream, _: &mut [u8]) -> Poll<usize, IoError> { panic!() }
|
|
||||||
fn write_substream(&self, _: &mut Self::Substream, _: &[u8]) -> Poll<usize, IoError> { panic!() }
|
impl TestBuilder {
|
||||||
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { panic!() }
|
fn new() -> Self {
|
||||||
fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { panic!() }
|
TestBuilder {
|
||||||
fn destroy_substream(&self, _: Self::Substream) { panic!() }
|
muxer: DummyMuxer::new(),
|
||||||
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { Ok(Async::Ready(())) }
|
handler: Handler::default(),
|
||||||
fn flush_all(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) }
|
want_open_substream: false,
|
||||||
|
substream_user_data: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
|
||||||
|
self.muxer.set_inbound_connection_state(state);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
|
||||||
|
self.muxer.set_outbound_connection_state(state);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_handler_state(&mut self, state: HandlerState) -> &mut Self {
|
||||||
|
self.handler.state = Some(state);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_open_substream(&mut self, user_data: usize) -> &mut Self {
|
||||||
|
self.want_open_substream = true;
|
||||||
|
self.substream_user_data = user_data;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handled_node(&mut self) -> TestHandledNode {
|
||||||
|
let mut h = HandledNode::new(self.muxer.clone(), self.handler.clone());
|
||||||
|
if self.want_open_substream {
|
||||||
|
h.node.get_mut().open_substream(self.substream_user_data).expect("open substream should work");
|
||||||
|
}
|
||||||
|
h
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the state of the `Handler` after `inject_outbound_closed` is called
|
||||||
|
fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) {
|
||||||
|
handled_node.handler.next_outbound_state = Some(next_state);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn proper_shutdown() {
|
fn proper_shutdown() {
|
||||||
// Test that `shutdown()` is properly called on the handler once a node stops.
|
struct ShutdownHandler<T> {
|
||||||
struct Handler<T> {
|
|
||||||
did_substream_attempt: bool,
|
did_substream_attempt: bool,
|
||||||
inbound_closed: bool,
|
inbound_closed: bool,
|
||||||
substream_attempt_cancelled: bool,
|
substream_attempt_cancelled: bool,
|
||||||
shutdown_called: bool,
|
shutdown_called: bool,
|
||||||
marker: PhantomData<T>,
|
marker: PhantomData<T>
|
||||||
};
|
}
|
||||||
impl<T> NodeHandler for Handler<T> {
|
impl<T> NodeHandler for ShutdownHandler<T> {
|
||||||
type InEvent = ();
|
type InEvent = ();
|
||||||
type OutEvent = ();
|
type OutEvent = ();
|
||||||
type Substream = T;
|
type Substream = T;
|
||||||
type OutboundOpenInfo = ();
|
type OutboundOpenInfo = ();
|
||||||
fn inject_substream(&mut self, _: T, _: NodeHandlerEndpoint<()>) { panic!() }
|
fn inject_substream(&mut self, _: Self::Substream, _: NodeHandlerEndpoint<Self::OutboundOpenInfo>) { panic!() }
|
||||||
fn inject_inbound_closed(&mut self) {
|
fn inject_inbound_closed(&mut self) {
|
||||||
assert!(!self.inbound_closed);
|
assert!(!self.inbound_closed);
|
||||||
self.inbound_closed = true;
|
self.inbound_closed = true;
|
||||||
@ -357,13 +403,20 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl<T> Drop for Handler<T> {
|
|
||||||
|
impl<T> Drop for ShutdownHandler<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
if self.did_substream_attempt {
|
||||||
assert!(self.shutdown_called);
|
assert!(self.shutdown_called);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let handled = HandledNode::new(InstaCloseMuxer, Handler {
|
// Test that `shutdown()` is properly called on the handler once a node stops.
|
||||||
|
let mut muxer = DummyMuxer::new();
|
||||||
|
muxer.set_inbound_connection_state(DummyConnectionState::Closed);
|
||||||
|
muxer.set_outbound_connection_state(DummyConnectionState::Closed);
|
||||||
|
let handled = HandledNode::new(muxer, ShutdownHandler {
|
||||||
did_substream_attempt: false,
|
did_substream_attempt: false,
|
||||||
inbound_closed: false,
|
inbound_closed: false,
|
||||||
substream_attempt_cancelled: false,
|
substream_attempt_cancelled: false,
|
||||||
@ -373,4 +426,227 @@ mod tests {
|
|||||||
|
|
||||||
current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap();
|
current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn can_inject_event() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
let event = Event::Custom("banana");
|
||||||
|
handled.inject_event(event.clone());
|
||||||
|
assert_eq!(handled.handler().events, vec![event]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn knows_if_inbound_is_closed() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop
|
||||||
|
.handled_node();
|
||||||
|
handled.poll().expect("poll failed");
|
||||||
|
assert!(!handled.is_inbound_open())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn knows_if_outbound_is_closed() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop
|
||||||
|
.with_open_substream(987) // without at least one substream we do not poll_outbound so we never get the event
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
handled.poll().expect("poll failed");
|
||||||
|
assert!(!handled.is_outbound_open());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_shutting_down_is_true_when_called_shutdown_on_the_handled_node() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_handler_state(HandlerState::Ready(None)) // Stop the loop towards the end of the first run
|
||||||
|
.handled_node();
|
||||||
|
assert!(!handled.is_shutting_down());
|
||||||
|
handled.poll().expect("poll should work");
|
||||||
|
handled.shutdown();
|
||||||
|
assert!(handled.is_shutting_down());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_shutting_down_is_true_when_in_and_outbounds_are_closed() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_open_substream(123) // avoid infinite loop
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
handled.poll().expect("poll should work");
|
||||||
|
|
||||||
|
// Shutting down (in- and outbound are closed, and the handler is shutdown)
|
||||||
|
assert!(handled.is_shutting_down());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_shutting_down_is_true_when_handler_is_gone() {
|
||||||
|
// when in-/outbound NodeStreams are open or Async::Ready(None) we reach the handlers `poll()` and initiate shutdown.
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_handler_state(HandlerState::Ready(None)) // avoid infinite loop
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
handled.poll().expect("poll should work");
|
||||||
|
|
||||||
|
assert!(handled.is_shutting_down());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn is_shutting_down_is_true_when_handler_is_gone_even_if_in_and_outbounds_are_open() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Opened)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Opened)
|
||||||
|
.with_open_substream(123)
|
||||||
|
.with_handler_state(HandlerState::Ready(None))
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
handled.poll().expect("poll should work");
|
||||||
|
|
||||||
|
assert!(handled.is_shutting_down());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_with_unready_node_stream_polls_handler() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
// make NodeStream return NotReady
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
// make Handler return return Ready(None) so we break the infinite loop
|
||||||
|
.with_handler_state(HandlerState::Ready(None))
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_with_unready_node_stream_and_handler_emits_custom_event() {
|
||||||
|
let expected_event = Some(NodeHandlerEvent::Custom(Event::Custom("pineapple")));
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
// make NodeStream return NotReady
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
// make Handler return return Ready(Some(…))
|
||||||
|
.with_handler_state(HandlerState::Ready(expected_event))
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
|
||||||
|
assert_matches!(event, Event::Custom("pineapple"))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handler_emits_outbound_closed_when_opening_new_substream_on_closed_node() {
|
||||||
|
let open_event = Some(NodeHandlerEvent::OutboundSubstreamRequest(456));
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_handler_state(HandlerState::Ready(open_event))
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
set_next_handler_outbound_state(
|
||||||
|
&mut handled,
|
||||||
|
HandlerState::Ready(Some(NodeHandlerEvent::Custom(Event::Custom("pear"))))
|
||||||
|
);
|
||||||
|
handled.poll().expect("poll works");
|
||||||
|
assert_eq!(handled.handler().events, vec![Event::OutboundClosed]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_returns_not_ready_when_node_stream_and_handler_is_not_ready() {
|
||||||
|
let mut handled = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_open_substream(12)
|
||||||
|
.with_handler_state(HandlerState::NotReady)
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
// Under the hood, this is what happens when calling `poll()`:
|
||||||
|
// - we reach `node.poll_inbound()` and because the connection is
|
||||||
|
// closed, `inbound_finished` is set to true.
|
||||||
|
// - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls
|
||||||
|
// `inject_inbound_close`, but that's irrelevant here)
|
||||||
|
// - back in `poll()` we cal `handler.poll()` which does nothing because
|
||||||
|
// `HandlerState` is `NotReady`: loop continues
|
||||||
|
// - polls the node again which now skips the inbound block because
|
||||||
|
// `inbound_finished` is true.
|
||||||
|
// - Now `poll_outbound()` is called which returns `Async::Ready(None)`
|
||||||
|
// and sets `outbound_finished` to true. …calls destroy_outbound and
|
||||||
|
// yields Ready(OutboundClosed) …so the HandledNode calls
|
||||||
|
// `inject_outbound_closed`.
|
||||||
|
// - Now we have `inbound_finished` and `outbound_finished` set (and no
|
||||||
|
// more outbound substreams).
|
||||||
|
// - Next we poll the handler again which again does nothing because
|
||||||
|
// HandlerState is NotReady (and the node is still there)
|
||||||
|
// - HandledNode polls the node again: we skip inbound and there are no
|
||||||
|
// more outbound substreams so we skip that too; the addr is now
|
||||||
|
// Resolved so that part is skipped too
|
||||||
|
// - We reach the last section and the NodeStream yields Async::Ready(None)
|
||||||
|
// - Back in HandledNode the Async::Ready(None) triggers a shutdown
|
||||||
|
// – …and causes the Handler to yield Async::Ready(None)
|
||||||
|
// – which in turn makes the HandledNode to yield Async::Ready(None) as well
|
||||||
|
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
|
||||||
|
assert_eq!(handled.handler().events, vec![
|
||||||
|
Event::InboundClosed, Event::OutboundClosed
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_yields_inbound_closed_event() {
|
||||||
|
let mut h = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_handler_state(HandlerState::Err) // stop the loop
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
assert_eq!(h.handler().events, vec![]);
|
||||||
|
let _ = h.poll();
|
||||||
|
assert_eq!(h.handler().events, vec![Event::InboundClosed]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_yields_outbound_closed_event() {
|
||||||
|
let mut h = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_open_substream(32)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Closed)
|
||||||
|
.with_handler_state(HandlerState::Err) // stop the loop
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
assert_eq!(h.handler().events, vec![]);
|
||||||
|
let _ = h.poll();
|
||||||
|
assert_eq!(h.handler().events, vec![Event::OutboundClosed]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_yields_outbound_substream() {
|
||||||
|
let mut h = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Opened)
|
||||||
|
.with_open_substream(1)
|
||||||
|
.with_handler_state(HandlerState::Err) // stop the loop
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
assert_eq!(h.handler().events, vec![]);
|
||||||
|
let _ = h.poll();
|
||||||
|
assert_eq!(h.handler().events, vec![Event::Substream(Some(1))]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn poll_yields_inbound_substream() {
|
||||||
|
let mut h = TestBuilder::new()
|
||||||
|
.with_muxer_inbound_state(DummyConnectionState::Opened)
|
||||||
|
.with_muxer_outbound_state(DummyConnectionState::Pending)
|
||||||
|
.with_handler_state(HandlerState::Err) // stop the loop
|
||||||
|
.handled_node();
|
||||||
|
|
||||||
|
assert_eq!(h.handler().events, vec![]);
|
||||||
|
let _ = h.poll();
|
||||||
|
assert_eq!(h.handler().events, vec![Event::Substream(None)]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -306,7 +306,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Closing the node if there's no way we can do anything more.
|
// Closing the node if there's no way we can do anything more.
|
||||||
if self.inbound_state == StreamState::Closed
|
if self.inbound_state == StreamState::Closed
|
||||||
&& self.outbound_state == StreamState::Closed
|
&& self.outbound_state == StreamState::Closed
|
||||||
|
105
core/src/tests/dummy_handler.rs
Normal file
105
core/src/tests/dummy_handler.rs
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
//! Concrete `NodeHandler` implementation and assorted testing types
|
||||||
|
|
||||||
|
use std::io::{self, Error as IoError};
|
||||||
|
|
||||||
|
use super::dummy_muxer::DummyMuxer;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use muxing::SubstreamRef;
|
||||||
|
use nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
|
pub(crate) struct Handler {
|
||||||
|
pub events: Vec<Event>,
|
||||||
|
pub state: Option<HandlerState>,
|
||||||
|
pub next_outbound_state: Option<HandlerState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Handler {
|
||||||
|
fn default() -> Self {
|
||||||
|
Handler {
|
||||||
|
events: Vec::new(),
|
||||||
|
state: None,
|
||||||
|
next_outbound_state: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
|
pub(crate) enum HandlerState {
|
||||||
|
NotReady,
|
||||||
|
Ready(Option<NodeHandlerEvent<usize, Event>>),
|
||||||
|
Err,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
|
pub(crate) enum Event {
|
||||||
|
Custom(&'static str),
|
||||||
|
Substream(Option<usize>),
|
||||||
|
OutboundClosed,
|
||||||
|
InboundClosed,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeHandler for Handler {
|
||||||
|
type InEvent = Event;
|
||||||
|
type OutEvent = Event;
|
||||||
|
type OutboundOpenInfo = usize;
|
||||||
|
type Substream = SubstreamRef<Arc<DummyMuxer>>;
|
||||||
|
fn inject_substream(
|
||||||
|
&mut self,
|
||||||
|
_: Self::Substream,
|
||||||
|
endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>,
|
||||||
|
) {
|
||||||
|
let user_data = match endpoint {
|
||||||
|
NodeHandlerEndpoint::Dialer(user_data) => Some(user_data),
|
||||||
|
NodeHandlerEndpoint::Listener => None,
|
||||||
|
};
|
||||||
|
self.events.push(Event::Substream(user_data));
|
||||||
|
}
|
||||||
|
fn inject_inbound_closed(&mut self) {
|
||||||
|
self.events.push(Event::InboundClosed);
|
||||||
|
}
|
||||||
|
fn inject_outbound_closed(&mut self, _: usize) {
|
||||||
|
self.events.push(Event::OutboundClosed);
|
||||||
|
if let Some(ref state) = self.next_outbound_state {
|
||||||
|
self.state = Some(state.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn inject_event(&mut self, inevent: Self::InEvent) {
|
||||||
|
self.events.push(inevent)
|
||||||
|
}
|
||||||
|
fn shutdown(&mut self) {
|
||||||
|
self.state = Some(HandlerState::Ready(None));
|
||||||
|
}
|
||||||
|
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<usize, Event>>, IoError> {
|
||||||
|
match self.state {
|
||||||
|
Some(ref state) => match state {
|
||||||
|
HandlerState::NotReady => Ok(Async::NotReady),
|
||||||
|
HandlerState::Ready(None) => Ok(Async::Ready(None)),
|
||||||
|
HandlerState::Ready(Some(event)) => Ok(Async::Ready(Some(event.clone()))),
|
||||||
|
HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")),
|
||||||
|
},
|
||||||
|
None => Ok(Async::NotReady),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -22,8 +22,6 @@
|
|||||||
//! version of the trait along with a way to setup the muxer to behave in the
|
//! version of the trait along with a way to setup the muxer to behave in the
|
||||||
//! desired way when testing other components.
|
//! desired way when testing other components.
|
||||||
|
|
||||||
extern crate futures;
|
|
||||||
|
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
use muxing::{StreamMuxer, Shutdown};
|
use muxing::{StreamMuxer, Shutdown};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
@ -36,19 +34,19 @@ pub struct DummyOutboundSubstream {}
|
|||||||
|
|
||||||
/// Control the muxer state by setting the "connection" state as to set up a mock
|
/// Control the muxer state by setting the "connection" state as to set up a mock
|
||||||
/// muxer for higher level components.
|
/// muxer for higher level components.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
pub enum DummyConnectionState {
|
pub enum DummyConnectionState {
|
||||||
Pending, // use this to trigger the Async::NotReady code path
|
Pending, // use this to trigger the Async::NotReady code path
|
||||||
Closed, // use this to trigger the Async::Ready(None) code path
|
Closed, // use this to trigger the Async::Ready(None) code path
|
||||||
Opened, // use this to trigger the Async::Ready(Some(_)) code path
|
Opened, // use this to trigger the Async::Ready(Some(_)) code path
|
||||||
}
|
}
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
struct DummyConnection {
|
struct DummyConnection {
|
||||||
state: DummyConnectionState
|
state: DummyConnectionState
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `DummyMuxer` implements `StreamMuxer` and methods to control its behavior when used in tests
|
/// `DummyMuxer` implements `StreamMuxer` and methods to control its behavior when used in tests
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct DummyMuxer{
|
pub struct DummyMuxer{
|
||||||
in_connection: DummyConnection,
|
in_connection: DummyConnection,
|
||||||
out_connection: DummyConnection,
|
out_connection: DummyConnection,
|
||||||
|
@ -20,5 +20,9 @@
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod dummy_muxer;
|
pub(crate) mod dummy_muxer;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) mod dummy_transport;
|
pub(crate) mod dummy_transport;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(crate) mod dummy_handler;
|
||||||
|
Reference in New Issue
Block a user