mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 10:31:21 +00:00
Fix simultaneous dialing test (#957)
This commit is contained in:
@ -243,6 +243,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn is_remote_acknowledged(&self) -> bool {
|
||||
match self {
|
||||
EitherOutput::First(inner) => inner.is_remote_acknowledged(),
|
||||
EitherOutput::Second(inner) => inner.is_remote_acknowledged()
|
||||
}
|
||||
}
|
||||
|
||||
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> {
|
||||
match self {
|
||||
EitherOutput::First(inner) => inner.shutdown(kind),
|
||||
|
@ -143,6 +143,14 @@ pub trait StreamMuxer {
|
||||
/// Destroys a substream.
|
||||
fn destroy_substream(&self, s: Self::Substream);
|
||||
|
||||
/// Returns `true` if the remote has shown any sign of activity after the muxer has been open.
|
||||
///
|
||||
/// For optimisation purposes, the connection handshake of libp2p can be very optimistic and is
|
||||
/// allowed to assume that the handshake has succeeded when it didn't in fact succeed. This
|
||||
/// method can be called in order to determine whether the remote has accepted our handshake or
|
||||
/// has potentially not received it yet.
|
||||
fn is_remote_acknowledged(&self) -> bool;
|
||||
|
||||
/// Shutdown this `StreamMuxer`.
|
||||
///
|
||||
/// If supported, sends a hint to the remote that we may no longer open any further outbound
|
||||
@ -477,6 +485,11 @@ impl StreamMuxer for StreamMuxerBox {
|
||||
self.inner.shutdown(kind)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_remote_acknowledged(&self) -> bool {
|
||||
self.inner.is_remote_acknowledged()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush_all(&self) -> Poll<(), IoError> {
|
||||
self.inner.flush_all()
|
||||
@ -572,6 +585,11 @@ impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
|
||||
self.inner.shutdown(kind)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_remote_acknowledged(&self) -> bool {
|
||||
self.inner.is_remote_acknowledged()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush_all(&self) -> Poll<(), IoError> {
|
||||
self.inner.flush_all()
|
||||
|
@ -24,7 +24,7 @@ use crate::{
|
||||
nodes::{
|
||||
node::Substream,
|
||||
handled_node_tasks::{HandledNodesEvent, HandledNodesTasks, TaskClosedEvent},
|
||||
handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId},
|
||||
handled_node_tasks::{IntoNodeHandler, Task as HandledNodesTask, TaskId, ClosedTask},
|
||||
handled_node::{HandledNodeError, NodeHandler}
|
||||
}
|
||||
};
|
||||
@ -209,7 +209,7 @@ where
|
||||
TaskState::Connected(ref p, _) if *p == self.peer_id => true,
|
||||
_ => false
|
||||
});
|
||||
let user_data = match former_task.close() {
|
||||
let user_data = match former_task.close().into_user_data() {
|
||||
TaskState::Connected(_, user_data) => user_data,
|
||||
_ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \
|
||||
are always in the connected state")
|
||||
@ -318,7 +318,7 @@ where
|
||||
/// Interrupts a reach attempt.
|
||||
///
|
||||
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
|
||||
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), InterruptError> {
|
||||
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<InterruptedReachAttempt<TInEvent, TPeerId, TUserData>, InterruptError> {
|
||||
match self.inner.task(id.0) {
|
||||
None => Err(InterruptError::ReachAttemptNotFound),
|
||||
Some(task) => {
|
||||
@ -327,8 +327,9 @@ where
|
||||
TaskState::Pending => (),
|
||||
};
|
||||
|
||||
task.close();
|
||||
Ok(())
|
||||
Ok(InterruptedReachAttempt {
|
||||
inner: task.close(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -389,7 +390,10 @@ where
|
||||
};
|
||||
|
||||
match item {
|
||||
HandledNodesEvent::TaskClosed { id, result, handler, user_data } => {
|
||||
HandledNodesEvent::TaskClosed { task, result, handler } => {
|
||||
let id = task.id();
|
||||
let user_data = task.into_user_data();
|
||||
|
||||
match (user_data, result, handler) {
|
||||
(TaskState::Pending, Err(TaskClosedEvent::Reach(err)), Some(handler)) => {
|
||||
Async::Ready(CollectionEvent::ReachError {
|
||||
@ -496,6 +500,23 @@ impl fmt::Display for InterruptError {
|
||||
|
||||
impl error::Error for InterruptError {}
|
||||
|
||||
/// Reach attempt after it has been interrupted.
|
||||
pub struct InterruptedReachAttempt<TInEvent, TPeerId, TUserData> {
|
||||
inner: ClosedTask<TInEvent, TaskState<TPeerId, TUserData>>,
|
||||
}
|
||||
|
||||
impl<TInEvent, TPeerId, TUserData> fmt::Debug for InterruptedReachAttempt<TInEvent, TPeerId, TUserData>
|
||||
where
|
||||
TUserData: fmt::Debug,
|
||||
TPeerId: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
f.debug_tuple("InterruptedReachAttempt")
|
||||
.field(&self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Access to a peer in the collection.
|
||||
pub struct PeerMut<'a, TInEvent, TUserData, TPeerId = PeerId> {
|
||||
inner: HandledNodesTask<'a, TInEvent, TaskState<TPeerId, TUserData>>,
|
||||
@ -544,7 +565,7 @@ where
|
||||
/// No further event will be generated for this node.
|
||||
pub fn close(self) -> TUserData {
|
||||
let task_id = self.inner.id();
|
||||
if let TaskState::Connected(peer_id, user_data) = self.inner.close() {
|
||||
if let TaskState::Connected(peer_id, user_data) = self.inner.close().into_user_data() {
|
||||
let old_task_id = self.nodes.remove(&peer_id);
|
||||
debug_assert_eq!(old_task_id, Some(task_id));
|
||||
user_data
|
||||
@ -553,4 +574,16 @@ where
|
||||
nodes always matched a Connected entry in the tasks; QED");
|
||||
}
|
||||
}
|
||||
|
||||
/// Gives ownership of a closed reach attempt. As soon as the connection to the peer (`self`)
|
||||
/// has some acknowledgment from the remote that its connection is alive, it will close the
|
||||
/// connection inside `id`.
|
||||
///
|
||||
/// The reach attempt will only be effectively cancelled once the peer (the object you're
|
||||
/// manipulating) has received some network activity. However no event will be ever be
|
||||
/// generated from this reach attempt, and this takes effect immediately.
|
||||
pub fn take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TPeerId, TUserData>) {
|
||||
let _state = self.inner.take_over(id.inner);
|
||||
debug_assert!(if let TaskState::Pending = _state { true } else { false });
|
||||
}
|
||||
}
|
||||
|
@ -215,6 +215,13 @@ where
|
||||
self.handler.inject_event(event);
|
||||
}
|
||||
|
||||
/// Returns `true` if the remote has shown any sign of activity after the muxer has been open.
|
||||
///
|
||||
/// See `StreamMuxer::is_remote_acknowledged`.
|
||||
pub fn is_remote_acknowledged(&self) -> bool {
|
||||
self.node.get_ref().is_remote_acknowledged()
|
||||
}
|
||||
|
||||
/// Returns true if the inbound channel of the muxer is open.
|
||||
///
|
||||
/// If `true` is returned, more inbound substream will be received.
|
||||
|
@ -60,7 +60,7 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THand
|
||||
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
|
||||
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
||||
/// but no longer are, in which case we should ignore them.
|
||||
tasks: FnvHashMap<TaskId, (mpsc::UnboundedSender<TInEvent>, TUserData)>,
|
||||
tasks: FnvHashMap<TaskId, (mpsc::UnboundedSender<ExtToInMessage<TInEvent>>, TUserData)>,
|
||||
|
||||
/// Identifier for the next task to spawn.
|
||||
next_task_id: TaskId,
|
||||
@ -152,10 +152,8 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa
|
||||
/// This happens once the node handler closes or an error happens.
|
||||
// TODO: send back undelivered events?
|
||||
TaskClosed {
|
||||
/// Identifier of the task that closed.
|
||||
id: TaskId,
|
||||
/// The user data that was associated with the task.
|
||||
user_data: TUserData,
|
||||
/// The task that has been closed.
|
||||
task: ClosedTask<TInEvent, TUserData>,
|
||||
/// What happened.
|
||||
result: Result<(), TaskClosedEvent<TReachErr, THandlerErr>>,
|
||||
/// If the task closed before reaching the node, this contains the handler that was passed
|
||||
@ -226,6 +224,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
||||
self.tasks.insert(task_id, (tx, user_data));
|
||||
|
||||
let task = Box::new(NodeTask {
|
||||
taken_over: SmallVec::new(),
|
||||
inner: NodeTaskInner::Future {
|
||||
future,
|
||||
handler,
|
||||
@ -248,7 +247,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
||||
// Note: it is possible that sending an event fails if the background task has already
|
||||
// finished, but the local state hasn't reflected that yet because it hasn't been
|
||||
// polled. This is not an error situation.
|
||||
let _ = sender.unbounded_send(event.clone());
|
||||
let _ = sender.unbounded_send(ExtToInMessage::HandlerEvent(event.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -296,10 +295,16 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
||||
}
|
||||
},
|
||||
InToExtMessage::TaskClosed(result, handler) => {
|
||||
let (_, user_data) = self.tasks.remove(&task_id)
|
||||
let (channel, user_data) = self.tasks.remove(&task_id)
|
||||
.expect("poll_inner only returns valid TaskIds; QED");
|
||||
HandledNodesEvent::TaskClosed {
|
||||
id: task_id, result, handler, user_data,
|
||||
task: ClosedTask {
|
||||
id: task_id,
|
||||
channel,
|
||||
user_data,
|
||||
},
|
||||
result,
|
||||
handler,
|
||||
}
|
||||
},
|
||||
})
|
||||
@ -339,7 +344,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
||||
|
||||
/// Access to a task in the collection.
|
||||
pub struct Task<'a, TInEvent, TUserData> {
|
||||
inner: OccupiedEntry<'a, TaskId, (mpsc::UnboundedSender<TInEvent>, TUserData)>,
|
||||
inner: OccupiedEntry<'a, TaskId, (mpsc::UnboundedSender<ExtToInMessage<TInEvent>>, TUserData)>,
|
||||
}
|
||||
|
||||
impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> {
|
||||
@ -350,7 +355,7 @@ impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> {
|
||||
// It is possible that the sender is closed if the background task has already finished
|
||||
// but the local state hasn't been updated yet because we haven't been polled in the
|
||||
// meanwhile.
|
||||
let _ = self.inner.get_mut().0.unbounded_send(event);
|
||||
let _ = self.inner.get_mut().0.unbounded_send(ExtToInMessage::HandlerEvent(event));
|
||||
}
|
||||
|
||||
/// Returns the user data associated with the task.
|
||||
@ -371,9 +376,22 @@ impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> {
|
||||
|
||||
/// Closes the task. Returns the user data.
|
||||
///
|
||||
/// No further event will be generated for this task.
|
||||
pub fn close(self) -> TUserData {
|
||||
self.inner.remove().1
|
||||
/// No further event will be generated for this task, but the connection inside the task will
|
||||
/// continue to run until the `ClosedTask` is destroyed.
|
||||
pub fn close(self) -> ClosedTask<TInEvent, TUserData> {
|
||||
let id = *self.inner.key();
|
||||
let (channel, user_data) = self.inner.remove();
|
||||
ClosedTask { id, channel, user_data }
|
||||
}
|
||||
|
||||
/// Gives ownership of a closed task. As soon as our task (`self`) has some acknowledgment from
|
||||
/// the remote that its connection is alive, it will close the connection with `other`.
|
||||
pub fn take_over(&mut self, other: ClosedTask<TInEvent, TUserData>) -> TUserData {
|
||||
// It is possible that the sender is closed if the background task has already finished
|
||||
// but the local state hasn't been updated yet because we haven't been polled in the
|
||||
// meanwhile.
|
||||
let _ = self.inner.get_mut().0.unbounded_send(ExtToInMessage::TakeOver(other.channel));
|
||||
other.user_data
|
||||
}
|
||||
}
|
||||
|
||||
@ -389,6 +407,66 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Task after it has been closed. The connection to the remote is potentially still going on, but
|
||||
/// no new event for this task will be received.
|
||||
pub struct ClosedTask<TInEvent, TUserData> {
|
||||
/// Identifier of the task that closed. No longer corresponds to anything, but can be reported
|
||||
/// to the user.
|
||||
id: TaskId,
|
||||
/// The channel to the task. The task will continue to work for as long as this channel is
|
||||
/// alive, but events produced by it are ignored.
|
||||
channel: mpsc::UnboundedSender<ExtToInMessage<TInEvent>>,
|
||||
/// The data provided by the user.
|
||||
user_data: TUserData,
|
||||
}
|
||||
|
||||
impl<TInEvent, TUserData> ClosedTask<TInEvent, TUserData> {
|
||||
/// Returns the task id. Note that this task is no longer part of the collection, and therefore
|
||||
/// calling `task()` with this ID will fail.
|
||||
#[inline]
|
||||
pub fn id(&self) -> TaskId {
|
||||
self.id
|
||||
}
|
||||
|
||||
/// Returns the user data associated with the task.
|
||||
pub fn user_data(&self) -> &TUserData {
|
||||
&self.user_data
|
||||
}
|
||||
|
||||
/// Returns the user data associated with the task.
|
||||
pub fn user_data_mut(&mut self) -> &mut TUserData {
|
||||
&mut self.user_data
|
||||
}
|
||||
|
||||
/// Finish destroying the task and yield the user data. This closes the connection to the
|
||||
/// remote.
|
||||
pub fn into_user_data(self) -> TUserData {
|
||||
self.user_data
|
||||
}
|
||||
}
|
||||
|
||||
impl<TInEvent, TUserData> fmt::Debug for ClosedTask<TInEvent, TUserData>
|
||||
where
|
||||
TUserData: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||
f.debug_tuple("ClosedTask")
|
||||
.field(&self.id)
|
||||
.field(&self.user_data)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Message to transmit from the public API to a task.
|
||||
#[derive(Debug)]
|
||||
enum ExtToInMessage<TInEvent> {
|
||||
/// An event to transmit to the node handler.
|
||||
HandlerEvent(TInEvent),
|
||||
/// When received, stores the parameter inside the task and keeps it alive until we have an
|
||||
/// acknowledgment that the remote has accepted our handshake.
|
||||
TakeOver(mpsc::UnboundedSender<ExtToInMessage<TInEvent>>),
|
||||
}
|
||||
|
||||
/// Message to transmit from a task to the public API.
|
||||
#[derive(Debug)]
|
||||
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {
|
||||
@ -411,11 +489,13 @@ where
|
||||
/// Sender to transmit events to the outside.
|
||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TPeerId>, TaskId)>,
|
||||
/// Receiving end for events sent from the main `HandledNodesTasks`.
|
||||
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
|
||||
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<ExtToInMessage<TInEvent>>>,
|
||||
/// Inner state of the `NodeTask`.
|
||||
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>,
|
||||
/// Identifier of the attempt.
|
||||
id: TaskId,
|
||||
/// Channels to keep alive for as long as we don't have an acknowledgment from the remote.
|
||||
taken_over: SmallVec<[mpsc::UnboundedSender<ExtToInMessage<TInEvent>>; 1]>,
|
||||
}
|
||||
|
||||
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>
|
||||
@ -463,7 +543,12 @@ where
|
||||
loop {
|
||||
match self.in_events_rx.poll() {
|
||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some(event))) => events_buffer.push(event),
|
||||
Ok(Async::Ready(Some(ExtToInMessage::HandlerEvent(event)))) => {
|
||||
events_buffer.push(event)
|
||||
},
|
||||
Ok(Async::Ready(Some(ExtToInMessage::TakeOver(take_over)))) => {
|
||||
self.taken_over.push(take_over);
|
||||
},
|
||||
Ok(Async::NotReady) => break,
|
||||
Err(_) => unreachable!("An UnboundedReceiver never errors"),
|
||||
}
|
||||
@ -501,9 +586,12 @@ where
|
||||
loop {
|
||||
match self.in_events_rx.poll() {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(event))) => {
|
||||
Ok(Async::Ready(Some(ExtToInMessage::HandlerEvent(event)))) => {
|
||||
node.inject_event(event)
|
||||
},
|
||||
Ok(Async::Ready(Some(ExtToInMessage::TakeOver(take_over)))) => {
|
||||
self.taken_over.push(take_over);
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
// Node closed by the external API; start shutdown process.
|
||||
node.shutdown();
|
||||
@ -516,6 +604,10 @@ where
|
||||
|
||||
// Process the node.
|
||||
loop {
|
||||
if !self.taken_over.is_empty() && node.is_remote_acknowledged() {
|
||||
self.taken_over.clear();
|
||||
}
|
||||
|
||||
match node.poll() {
|
||||
Ok(Async::NotReady) => {
|
||||
self.inner = NodeTaskInner::Node(node);
|
||||
|
@ -24,79 +24,12 @@ use super::*;
|
||||
|
||||
use std::io;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use futures::future::{self, FutureResult};
|
||||
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use crate::tests::dummy_handler::{Handler, InEvent, OutEvent, TestHandledNode};
|
||||
use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
use futures::future;
|
||||
use crate::tests::dummy_handler::{Handler, InEvent, OutEvent};
|
||||
use crate::tests::dummy_muxer::DummyMuxer;
|
||||
use void::Void;
|
||||
use crate::PeerId;
|
||||
|
||||
type TestNodeTask = NodeTask<
|
||||
FutureResult<(PeerId, DummyMuxer), io::Error>,
|
||||
DummyMuxer,
|
||||
Handler,
|
||||
InEvent,
|
||||
OutEvent,
|
||||
io::Error,
|
||||
PeerId,
|
||||
>;
|
||||
|
||||
struct NodeTaskTestBuilder {
|
||||
task_id: TaskId,
|
||||
inner_node: Option<TestHandledNode>,
|
||||
inner_fut: Option<FutureResult<(PeerId, DummyMuxer), io::Error>>,
|
||||
}
|
||||
|
||||
impl NodeTaskTestBuilder {
|
||||
fn new() -> Self {
|
||||
NodeTaskTestBuilder {
|
||||
task_id: TaskId(123),
|
||||
inner_node: None,
|
||||
inner_fut: {
|
||||
let peer_id = PeerId::random();
|
||||
Some(future::ok((peer_id, DummyMuxer::new())))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn with_inner_fut(&mut self, fut: FutureResult<(PeerId, DummyMuxer), io::Error>) -> &mut Self{
|
||||
self.inner_fut = Some(fut);
|
||||
self
|
||||
}
|
||||
|
||||
fn with_task_id(&mut self, id: usize) -> &mut Self {
|
||||
self.task_id = TaskId(id);
|
||||
self
|
||||
}
|
||||
|
||||
fn node_task(&mut self) -> (
|
||||
TestNodeTask,
|
||||
UnboundedSender<InEvent>,
|
||||
UnboundedReceiver<(InToExtMessage<OutEvent, Handler, io::Error, io::Error, PeerId>, TaskId)>,
|
||||
) {
|
||||
let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage<OutEvent, Handler, _, _, _>, TaskId)>();
|
||||
let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::<InEvent>();
|
||||
let inner = if self.inner_node.is_some() {
|
||||
NodeTaskInner::Node(self.inner_node.take().unwrap())
|
||||
} else {
|
||||
NodeTaskInner::Future {
|
||||
future: self.inner_fut.take().unwrap(),
|
||||
handler: Handler::default(),
|
||||
events_buffer: Vec::new(),
|
||||
}
|
||||
};
|
||||
let node_task = NodeTask {
|
||||
inner,
|
||||
events_tx: events_from_node_task_tx.clone(), // events TO the outside
|
||||
in_events_rx: events_to_node_task_rx.fuse(), // events FROM the outside
|
||||
id: self.task_id,
|
||||
};
|
||||
(node_task, events_to_node_task_tx, events_from_node_task_rx)
|
||||
}
|
||||
}
|
||||
|
||||
type TestHandledNodesTasks = HandledNodesTasks<InEvent, OutEvent, Handler, io::Error, io::Error, ()>;
|
||||
|
||||
struct HandledNodeTaskTestBuilder {
|
||||
@ -133,91 +66,6 @@ impl HandledNodeTaskTestBuilder {
|
||||
}
|
||||
|
||||
|
||||
// Tests for NodeTask
|
||||
|
||||
#[test]
|
||||
fn task_emits_event_when_things_happen_in_the_node() {
|
||||
let (node_task, tx, mut rx) = NodeTaskTestBuilder::new()
|
||||
.with_task_id(890)
|
||||
.node_task();
|
||||
|
||||
tx.unbounded_send(InEvent::Custom("beef")).expect("send to NodeTask should work");
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
rt.spawn(node_task);
|
||||
let events = rt.block_on(rx.by_ref().take(2).collect()).expect("reading on rx should work");
|
||||
|
||||
assert_matches!(events[0], (InToExtMessage::NodeReached(_), TaskId(890)));
|
||||
assert_matches!(events[1], (InToExtMessage::NodeEvent(ref outevent), TaskId(890)) => {
|
||||
assert_matches!(outevent, OutEvent::Custom(beef) => {
|
||||
assert_eq!(beef, &"beef");
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn task_exits_when_node_errors() {
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let (node_task, _tx, rx) = NodeTaskTestBuilder::new()
|
||||
.with_inner_fut(future::err(io::Error::new(io::ErrorKind::Other, "nah")))
|
||||
.with_task_id(345)
|
||||
.node_task();
|
||||
|
||||
rt.spawn(node_task);
|
||||
let events = rt.block_on(rx.collect()).expect("rx failed");
|
||||
assert!(events.len() == 1);
|
||||
assert_matches!(events[0], (InToExtMessage::TaskClosed{..}, TaskId(345)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn task_exits_when_node_is_done() {
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
let fut = {
|
||||
let peer_id = PeerId::random();
|
||||
let mut muxer = DummyMuxer::new();
|
||||
muxer.set_inbound_connection_state(DummyConnectionState::Closed);
|
||||
muxer.set_outbound_connection_state(DummyConnectionState::Closed);
|
||||
future::ok((peer_id, muxer))
|
||||
};
|
||||
let (node_task, tx, rx) = NodeTaskTestBuilder::new()
|
||||
.with_inner_fut(fut)
|
||||
.with_task_id(345)
|
||||
.node_task();
|
||||
|
||||
// Even though we set up the muxer outbound state to be `Closed` we
|
||||
// still need to open a substream or the outbound state will never
|
||||
// be checked (see https://github.com/libp2p/rust-libp2p/issues/609).
|
||||
// We do not have a HandledNode yet, so we can't simply call
|
||||
// `open_substream`. Instead we send a message to the NodeTask,
|
||||
// which will be buffered until the inner future resolves, then
|
||||
// it'll call `inject_event` on the handler. In the test Handler,
|
||||
// inject_event will set the next state so that it yields an
|
||||
// OutboundSubstreamRequest.
|
||||
// Back up in the HandledNode, at the next iteration we'll
|
||||
// open_substream() and iterate again. This time, node.poll() will
|
||||
// poll the muxer inbound (closed) and also outbound (because now
|
||||
// there's an entry in the outbound_streams) which will be Closed
|
||||
// (because we set up the muxer state so) and thus yield
|
||||
// Async::Ready(None) which in turn makes the NodeStream yield an
|
||||
// Async::Ready(OutboundClosed) to the HandledNode.
|
||||
// Now we're at the point where poll_inbound, poll_outbound and
|
||||
// address are all skipped and there is nothing left to do: we yield
|
||||
// Async::Ready(None) from the NodeStream. In the HandledNode,
|
||||
// Async::Ready(None) triggers a shutdown of the Handler so that it
|
||||
// also yields Async::Ready(None). Finally, the NodeTask gets a
|
||||
// Async::Ready(None) and sends a TaskClosed and returns
|
||||
// Async::Ready(()). QED.
|
||||
|
||||
let create_outbound_substream_event = InEvent::Substream(Some(135));
|
||||
tx.unbounded_send(create_outbound_substream_event).expect("send msg works");
|
||||
rt.spawn(node_task);
|
||||
let events = rt.block_on(rx.collect()).expect("rx failed");
|
||||
|
||||
assert_eq!(events.len(), 2);
|
||||
assert_matches!(events[0].0, InToExtMessage::NodeReached(PeerId{..}));
|
||||
assert_matches!(events[1].0, InToExtMessage::TaskClosed(Ok(()), _));
|
||||
}
|
||||
|
||||
|
||||
// Tests for HandledNodeTasks
|
||||
|
||||
#[test]
|
||||
|
@ -163,6 +163,13 @@ where
|
||||
self.outbound_state == StreamState::Open
|
||||
}
|
||||
|
||||
/// Returns `true` if the remote has shown any sign of activity after the muxer has been open.
|
||||
///
|
||||
/// See `StreamMuxer::is_remote_acknowledged`.
|
||||
pub fn is_remote_acknowledged(&self) -> bool {
|
||||
self.muxer.is_remote_acknowledged()
|
||||
}
|
||||
|
||||
/// Destroys the node stream and returns all the pending outbound substreams.
|
||||
pub fn close(mut self) -> Vec<TUserData> {
|
||||
self.cancel_outgoing()
|
||||
|
@ -1033,16 +1033,17 @@ where
|
||||
self.start_dial_out(peer_id, handler, first, rest);
|
||||
}
|
||||
|
||||
if let Some(interrupt) = action.interrupt {
|
||||
if let Some((peer_id, interrupt)) = action.take_over {
|
||||
// TODO: improve proof or remove; this is too complicated right now
|
||||
self.active_nodes
|
||||
let interrupted = self.active_nodes
|
||||
.interrupt(interrupt)
|
||||
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
|
||||
.expect("take_over is guaranteed to be gathered from `out_reach_attempts`;
|
||||
we insert in out_reach_attempts only when we call \
|
||||
active_nodes.add_reach_attempt, and we remove only when we call \
|
||||
interrupt or when a reach attempt succeeds or errors; therefore the \
|
||||
out_reach_attempts should always be in sync with the actual \
|
||||
attempts; QED");
|
||||
self.active_nodes.peer_mut(&peer_id).unwrap().take_over(interrupted);
|
||||
}
|
||||
|
||||
Async::Ready(out_event)
|
||||
@ -1054,14 +1055,16 @@ where
|
||||
#[must_use]
|
||||
struct ActionItem<THandler, TPeerId> {
|
||||
start_dial_out: Option<(TPeerId, THandler, Multiaddr, Vec<Multiaddr>)>,
|
||||
interrupt: Option<ReachAttemptId>,
|
||||
/// The `ReachAttemptId` should be interrupted, and the task for the given `PeerId` should take
|
||||
/// over it.
|
||||
take_over: Option<(TPeerId, ReachAttemptId)>,
|
||||
}
|
||||
|
||||
impl<THandler, TPeerId> Default for ActionItem<THandler, TPeerId> {
|
||||
fn default() -> Self {
|
||||
ActionItem {
|
||||
start_dial_out: None,
|
||||
interrupt: None,
|
||||
take_over: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1126,7 +1129,7 @@ where
|
||||
if let Some(attempt) = reach_attempts.out_reach_attempts.remove(&event.peer_id()) {
|
||||
debug_assert_ne!(attempt.id, event.reach_attempt_id());
|
||||
ActionItem {
|
||||
interrupt: Some(attempt.id),
|
||||
take_over: Some((event.peer_id().clone(), attempt.id)),
|
||||
.. Default::default()
|
||||
}
|
||||
} else {
|
||||
|
@ -114,6 +114,7 @@ impl StreamMuxer for DummyMuxer {
|
||||
unreachable!()
|
||||
}
|
||||
fn destroy_substream(&self, _: Self::Substream) {}
|
||||
fn is_remote_acknowledged(&self) -> bool { true }
|
||||
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
@ -20,9 +20,10 @@
|
||||
|
||||
use futures::{future, prelude::*};
|
||||
use libp2p_core::nodes::raw_swarm::{RawSwarm, RawSwarmEvent, IncomingError};
|
||||
use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt};
|
||||
use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt, upgrade::InboundUpgradeExt};
|
||||
use libp2p_core::protocols_handler::{ProtocolsHandler, KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr};
|
||||
use std::io;
|
||||
use std::{io, time::Duration, time::Instant};
|
||||
use tokio_timer::Delay;
|
||||
|
||||
// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ?
|
||||
struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>, bool);
|
||||
@ -99,6 +100,10 @@ fn raw_swarm_simultaneous_connect() {
|
||||
// connection with the listening one.
|
||||
//
|
||||
|
||||
// Important note: This test is meant to detect race conditions which don't seem to happen
|
||||
// if we use the `MemoryTransport`. Using the TCP transport is important,
|
||||
// despite the fact that it adds a dependency.
|
||||
|
||||
for _ in 0 .. 10 {
|
||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||
// is about creating the transport
|
||||
@ -107,11 +112,13 @@ fn raw_swarm_simultaneous_connect() {
|
||||
let local_public_key = local_key.to_public_key();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, _| {
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let upgrade =
|
||||
libp2p_mplex::MplexConfig::default().map_outbound(move |muxer| (peer_id, muxer));
|
||||
upgrade::apply_outbound(out.stream, upgrade)
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
RawSwarm::new(transport, local_public_key.into_peer_id())
|
||||
};
|
||||
@ -121,11 +128,13 @@ fn raw_swarm_simultaneous_connect() {
|
||||
let local_public_key = local_key.to_public_key();
|
||||
let transport = libp2p_tcp::TcpConfig::new()
|
||||
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||
.and_then(move |out, _| {
|
||||
.and_then(move |out, endpoint| {
|
||||
let peer_id = out.remote_key.into_peer_id();
|
||||
let upgrade =
|
||||
libp2p_mplex::MplexConfig::default().map_outbound(move |muxer| (peer_id, muxer));
|
||||
upgrade::apply_outbound(out.stream, upgrade)
|
||||
let peer_id2 = peer_id.clone();
|
||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
||||
.map_outbound(move |muxer| (peer_id, muxer))
|
||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||
upgrade::apply(out.stream, upgrade, endpoint)
|
||||
});
|
||||
RawSwarm::new(transport, local_public_key.into_peer_id())
|
||||
};
|
||||
@ -139,6 +148,9 @@ fn raw_swarm_simultaneous_connect() {
|
||||
let mut swarm1_step = 0;
|
||||
let mut swarm2_step = 0;
|
||||
|
||||
let mut swarm1_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));
|
||||
let mut swarm2_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));
|
||||
|
||||
let future = future::poll_fn(|| -> Poll<(), io::Error> {
|
||||
loop {
|
||||
let mut swarm1_not_ready = false;
|
||||
@ -147,21 +159,33 @@ fn raw_swarm_simultaneous_connect() {
|
||||
// We add a lot of randomness. In a real-life situation the swarm also has to
|
||||
// handle other nodes, which may delay the processing.
|
||||
|
||||
if swarm1_step == 0 && rand::random::<f32>() < 0.2 {
|
||||
if swarm1_step == 0 {
|
||||
match swarm1_dial_start.poll().unwrap() {
|
||||
Async::Ready(_) => {
|
||||
let handler = TestHandler::default().into_node_handler_builder();
|
||||
swarm1.peer(swarm2.local_peer_id().clone()).into_not_connected().unwrap()
|
||||
.connect(swarm2_listen.clone(), handler);
|
||||
swarm1_step = 1;
|
||||
swarm1_not_ready = false;
|
||||
},
|
||||
Async::NotReady => swarm1_not_ready = true,
|
||||
}
|
||||
}
|
||||
|
||||
if swarm2_step == 0 && rand::random::<f32>() < 0.2 {
|
||||
if swarm2_step == 0 {
|
||||
match swarm2_dial_start.poll().unwrap() {
|
||||
Async::Ready(_) => {
|
||||
let handler = TestHandler::default().into_node_handler_builder();
|
||||
swarm2.peer(swarm1.local_peer_id().clone()).into_not_connected().unwrap()
|
||||
.connect(swarm1_listen.clone(), handler);
|
||||
swarm2_step = 1;
|
||||
swarm2_not_ready = false;
|
||||
},
|
||||
Async::NotReady => swarm2_not_ready = true,
|
||||
}
|
||||
}
|
||||
|
||||
if rand::random::<f32>() < 0.5 {
|
||||
if rand::random::<f32>() < 0.1 {
|
||||
match swarm1.poll() {
|
||||
Async::Ready(RawSwarmEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => {
|
||||
assert_eq!(swarm1_step, 2);
|
||||
@ -185,7 +209,7 @@ fn raw_swarm_simultaneous_connect() {
|
||||
}
|
||||
}
|
||||
|
||||
if rand::random::<f32>() < 0.5 {
|
||||
if rand::random::<f32>() < 0.1 {
|
||||
match swarm2.poll() {
|
||||
Async::Ready(RawSwarmEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => {
|
||||
assert_eq!(swarm2_step, 2);
|
||||
@ -209,11 +233,12 @@ fn raw_swarm_simultaneous_connect() {
|
||||
}
|
||||
}
|
||||
|
||||
if swarm1_step == 3 && swarm2_step == 3 {
|
||||
// TODO: make sure that >= 5 is correct
|
||||
if swarm1_step + swarm2_step >= 5 {
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
|
||||
if swarm1_step != 0 && swarm2_step != 0 && swarm1_not_ready && swarm2_not_ready {
|
||||
if swarm1_not_ready && swarm2_not_ready {
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
|
@ -114,7 +114,8 @@ impl MplexConfig {
|
||||
notifier_write: Arc::new(Notifier {
|
||||
to_notify: Mutex::new(Default::default()),
|
||||
}),
|
||||
is_shutdown: false
|
||||
is_shutdown: false,
|
||||
is_acknowledged: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -208,7 +209,9 @@ struct MultiplexInner<C> {
|
||||
notifier_write: Arc<Notifier>,
|
||||
/// If true, the connection has been shut down. We need to be careful not to accidentally
|
||||
/// call `Sink::poll_complete` or `Sink::start_send` after `Sink::close`.
|
||||
is_shutdown: bool
|
||||
is_shutdown: bool,
|
||||
/// If true, the remote has sent data to us.
|
||||
is_acknowledged: bool,
|
||||
}
|
||||
|
||||
struct Notifier {
|
||||
@ -296,6 +299,7 @@ where C: AsyncRead + AsyncWrite,
|
||||
};
|
||||
|
||||
trace!("Received message: {:?}", elem);
|
||||
inner.is_acknowledged = true;
|
||||
|
||||
// Handle substreams opening/closing.
|
||||
match elem {
|
||||
@ -546,6 +550,10 @@ where C: AsyncRead + AsyncWrite
|
||||
})
|
||||
}
|
||||
|
||||
fn is_remote_acknowledged(&self) -> bool {
|
||||
self.inner.lock().is_acknowledged
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
|
@ -24,19 +24,19 @@
|
||||
use futures::{future::{self, FutureResult}, prelude::*};
|
||||
use libp2p_core::{muxing::Shutdown, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}};
|
||||
use log::error;
|
||||
use std::{io, iter};
|
||||
use std::{io, iter, sync::atomic};
|
||||
use std::io::{Error as IoError};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
|
||||
pub struct Yamux<C>(yamux::Connection<C>);
|
||||
// TODO: add documentation and field names
|
||||
pub struct Yamux<C>(yamux::Connection<C>, atomic::AtomicBool);
|
||||
|
||||
impl<C> Yamux<C>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + 'static
|
||||
{
|
||||
pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
|
||||
Yamux(yamux::Connection::new(c, cfg, mode))
|
||||
Yamux(yamux::Connection::new(c, cfg, mode), atomic::AtomicBool::new(false))
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,7 +56,10 @@ where
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
|
||||
Ok(Async::Ready(Some(stream))) => Ok(Async::Ready(Some(stream)))
|
||||
Ok(Async::Ready(Some(stream))) => {
|
||||
self.1.store(true, atomic::Ordering::Release);
|
||||
Ok(Async::Ready(Some(stream)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,7 +80,11 @@ where
|
||||
|
||||
#[inline]
|
||||
fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
|
||||
sub.poll_read(buf)
|
||||
let result = sub.poll_read(buf);
|
||||
if let Ok(Async::Ready(_)) = result {
|
||||
self.1.store(true, atomic::Ordering::Release);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@ -99,6 +106,11 @@ where
|
||||
fn destroy_substream(&self, _: Self::Substream) {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_remote_acknowledged(&self) -> bool {
|
||||
self.1.load(atomic::Ordering::Acquire)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> {
|
||||
self.0.shutdown()
|
||||
|
Reference in New Issue
Block a user