Add a user data to CollectionStream (#947)

* Add a user data to CollectionStream

* Make NodeEvent return a PeerMut

* Add PeerMut::user_data_mut()

* Return the previous user data in accept()
This commit is contained in:
Pierre Krieger
2019-02-18 16:10:00 +01:00
committed by GitHub
parent 6cb2c71ca3
commit ca9534a38e
4 changed files with 127 additions and 78 deletions

View File

@ -35,20 +35,20 @@ use std::{error, fmt, hash::Hash, mem};
mod tests; mod tests;
/// Implementation of `Stream` that handles a collection of nodes. /// Implementation of `Stream` that handles a collection of nodes.
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId = PeerId> { pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
/// Object that handles the tasks. /// Object that handles the tasks.
/// ///
/// The user data contains the state of the task. If `Connected`, then a corresponding entry /// The user data contains the state of the task. If `Connected`, then a corresponding entry
/// must be present in `nodes`. /// must be present in `nodes`.
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TPeerId>, TPeerId>, inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TPeerId, TUserData>, TPeerId>,
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
/// must always be in the `Connected` state. /// must always be in the `Connected` state.
nodes: FnvHashMap<TPeerId, TaskId>, nodes: FnvHashMap<TPeerId, TaskId>,
} }
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
where where
TPeerId: fmt::Debug, TPeerId: fmt::Debug,
{ {
@ -59,18 +59,18 @@ where
/// State of a task. /// State of a task.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState<TPeerId> { enum TaskState<TPeerId, TUserData> {
/// Task is attempting to reach a peer. /// Task is attempting to reach a peer.
Pending, Pending,
/// The task is connected to a peer. /// The task is connected to a peer.
Connected(TPeerId), Connected(TPeerId, TUserData),
} }
/// Event that can happen on the `CollectionStream`. /// Event that can happen on the `CollectionStream`.
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TPeerId> { pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TUserData, TPeerId> {
/// A connection to a node has succeeded. You must use the provided event in order to accept /// A connection to a node has succeeded. You must use the provided event in order to accept
/// the connection. /// the connection.
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>), NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>),
/// A connection to a node has been closed. /// A connection to a node has been closed.
/// ///
@ -79,6 +79,8 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
NodeClosed { NodeClosed {
/// Identifier of the node. /// Identifier of the node.
peer_id: TPeerId, peer_id: TPeerId,
/// User data that was passed when accepting.
user_data: TUserData,
}, },
/// A connection to a node has errored. /// A connection to a node has errored.
@ -89,6 +91,8 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
peer_id: TPeerId, peer_id: TPeerId,
/// The error that happened. /// The error that happened.
error: HandledNodeError<THandlerErr>, error: HandledNodeError<THandlerErr>,
/// User data that was passed when accepting.
user_data: TUserData,
}, },
/// An error happened on the future that was trying to reach a node. /// An error happened on the future that was trying to reach a node.
@ -103,19 +107,20 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
/// A node has produced an event. /// A node has produced an event.
NodeEvent { NodeEvent {
/// Identifier of the node. /// The node that has generated the event.
peer_id: TPeerId, peer: PeerMut<'a, TInEvent, TUserData, TPeerId>,
/// The produced event. /// The produced event.
event: TOutEvent, event: TOutEvent,
}, },
} }
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
where TOutEvent: fmt::Debug, where TOutEvent: fmt::Debug,
TReachErr: fmt::Debug, TReachErr: fmt::Debug,
THandlerErr: fmt::Debug, THandlerErr: fmt::Debug,
TPeerId: Eq + Hash + Clone + fmt::Debug, TPeerId: Eq + Hash + Clone + fmt::Debug,
TUserData: fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self { match *self {
@ -124,14 +129,16 @@ where TOutEvent: fmt::Debug,
.field(inner) .field(inner)
.finish() .finish()
}, },
CollectionEvent::NodeClosed { ref peer_id } => { CollectionEvent::NodeClosed { ref peer_id, ref user_data } => {
f.debug_struct("CollectionEvent::NodeClosed") f.debug_struct("CollectionEvent::NodeClosed")
.field("peer_id", peer_id) .field("peer_id", peer_id)
.field("user_data", user_data)
.finish() .finish()
}, },
CollectionEvent::NodeError { ref peer_id, ref error } => { CollectionEvent::NodeError { ref peer_id, ref error, ref user_data } => {
f.debug_struct("CollectionEvent::NodeError") f.debug_struct("CollectionEvent::NodeError")
.field("peer_id", peer_id) .field("peer_id", peer_id)
.field("user_data", user_data)
.field("error", error) .field("error", error)
.finish() .finish()
}, },
@ -141,9 +148,9 @@ where TOutEvent: fmt::Debug,
.field("error", error) .field("error", error)
.finish() .finish()
}, },
CollectionEvent::NodeEvent { ref peer_id, ref event } => { CollectionEvent::NodeEvent { ref peer, ref event } => {
f.debug_struct("CollectionEvent::NodeEvent") f.debug_struct("CollectionEvent::NodeEvent")
.field("peer_id", peer_id) .field("peer_id", peer.id())
.field("event", event) .field("event", event)
.finish() .finish()
}, },
@ -153,17 +160,17 @@ where TOutEvent: fmt::Debug,
/// Event that happens when we reach a node. /// Event that happens when we reach a node.
#[must_use = "The node reached event is used to accept the newly-opened connection"] #[must_use = "The node reached event is used to accept the newly-opened connection"]
pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId = PeerId> { pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
/// Peer id we connected to. /// Peer id we connected to.
peer_id: TPeerId, peer_id: TPeerId,
/// The task id that reached the node. /// The task id that reached the node.
id: TaskId, id: TaskId,
/// The `CollectionStream` we are referencing. /// The `CollectionStream` we are referencing.
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>, parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>,
} }
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
where where
TPeerId: Eq + Hash + Clone, TPeerId: Eq + Hash + Clone,
{ {
@ -187,21 +194,28 @@ where
} }
/// Accepts the new node. /// Accepts the new node.
pub fn accept(self) -> (CollectionNodeAccept, TPeerId) { pub fn accept(self, user_data: TUserData) -> (CollectionNodeAccept<TUserData>, TPeerId) {
// Set the state of the task to `Connected`. // Set the state of the task to `Connected`.
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id); let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
*self.parent.inner.task(self.id) *self.parent.inner.task(self.id)
.expect("A CollectionReachEvent is only ever created from a valid attempt; QED") .expect("A CollectionReachEvent is only ever created from a valid attempt; QED")
.user_data_mut() = TaskState::Connected(self.peer_id.clone()); .user_data_mut() = TaskState::Connected(self.peer_id.clone(), user_data);
// It is possible that we already have a task connected to the same peer. In this // It is possible that we already have a task connected to the same peer. In this
// case, we need to emit a `NodeReplaced` event. // case, we need to emit a `NodeReplaced` event.
let tasks = &mut self.parent.inner; let tasks = &mut self.parent.inner;
let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) { let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) {
debug_assert!(*former_task.user_data() == TaskState::Connected(self.peer_id.clone())); debug_assert!(match *former_task.user_data() {
former_task.close(); TaskState::Connected(ref p, _) if *p == self.peer_id => true,
_ => false
});
let user_data = match former_task.close() {
TaskState::Connected(_, user_data) => user_data,
_ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \
are always in the connected state")
};
// TODO: we unfortunately have to clone the peer id here // TODO: we unfortunately have to clone the peer id here
(CollectionNodeAccept::ReplacedExisting, self.peer_id.clone()) (CollectionNodeAccept::ReplacedExisting(user_data), self.peer_id.clone())
} else { } else {
// TODO: we unfortunately have to clone the peer id here // TODO: we unfortunately have to clone the peer id here
(CollectionNodeAccept::NewEntry, self.peer_id.clone()) (CollectionNodeAccept::NewEntry, self.peer_id.clone())
@ -225,8 +239,8 @@ where
} }
} }
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
where where
TPeerId: Eq + Hash + Clone + fmt::Debug, TPeerId: Eq + Hash + Clone + fmt::Debug,
{ {
@ -238,8 +252,8 @@ where
} }
} }
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop for impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> Drop for
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
{ {
fn drop(&mut self) { fn drop(&mut self) {
let task = self.parent.inner.task(self.id) let task = self.parent.inner.task(self.id)
@ -254,9 +268,9 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop fo
/// Outcome of accepting a node. /// Outcome of accepting a node.
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CollectionNodeAccept { pub enum CollectionNodeAccept<TUserData> {
/// We replaced an existing node. /// We replaced an existing node. Returns the user data that was assigned to this node.
ReplacedExisting, ReplacedExisting(TUserData),
/// We didn't replace anything existing. /// We didn't replace anything existing.
NewEntry, NewEntry,
} }
@ -265,8 +279,8 @@ pub enum CollectionNodeAccept {
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReachAttemptId(TaskId); pub struct ReachAttemptId(TaskId);
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
where where
TPeerId: Eq + Hash + Clone, TPeerId: Eq + Hash + Clone,
{ {
@ -309,7 +323,7 @@ where
None => Err(InterruptError::ReachAttemptNotFound), None => Err(InterruptError::ReachAttemptNotFound),
Some(task) => { Some(task) => {
match task.user_data() { match task.user_data() {
TaskState::Connected(_) => return Err(InterruptError::AlreadyReached), TaskState::Connected(_, _) => return Err(InterruptError::AlreadyReached),
TaskState::Pending => (), TaskState::Pending => (),
}; };
@ -332,7 +346,7 @@ where
/// ///
/// Returns `None` if we don't have a connection to this peer. /// Returns `None` if we don't have a connection to this peer.
#[inline] #[inline]
pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<'_, TInEvent, TPeerId>> { pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<'_, TInEvent, TUserData, TPeerId>> {
let task = match self.nodes.get(id) { let task = match self.nodes.get(id) {
Some(&task) => task, Some(&task) => task,
None => return None, None => return None,
@ -368,7 +382,7 @@ where
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to /// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay /// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
/// > borrowed if necessary. /// > borrowed if necessary.
pub fn poll(&mut self) -> Async<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>> { pub fn poll(&mut self) -> Async<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>> {
let item = match self.inner.poll() { let item = match self.inner.poll() {
Async::Ready(item) => item, Async::Ready(item) => item,
Async::NotReady => return Async::NotReady, Async::NotReady => return Async::NotReady,
@ -399,24 +413,26 @@ where
panic!("The HandledNodesTasks is guaranteed to always return the handler \ panic!("The HandledNodesTasks is guaranteed to always return the handler \
when producing a TaskClosedEvent::Reach error"); when producing a TaskClosedEvent::Reach error");
}, },
(TaskState::Connected(peer_id), Ok(()), _handler) => { (TaskState::Connected(peer_id, user_data), Ok(()), _handler) => {
debug_assert!(_handler.is_none()); debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id); let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id)); debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(CollectionEvent::NodeClosed { Async::Ready(CollectionEvent::NodeClosed {
peer_id, peer_id,
user_data,
}) })
}, },
(TaskState::Connected(peer_id), Err(TaskClosedEvent::Node(err)), _handler) => { (TaskState::Connected(peer_id, user_data), Err(TaskClosedEvent::Node(err)), _handler) => {
debug_assert!(_handler.is_none()); debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id); let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id)); debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(CollectionEvent::NodeError { Async::Ready(CollectionEvent::NodeError {
peer_id, peer_id,
error: err, error: err,
user_data,
}) })
}, },
(TaskState::Connected(_), Err(TaskClosedEvent::Reach(_)), _) => { (TaskState::Connected(_, _), Err(TaskClosedEvent::Reach(_)), _) => {
panic!("A TaskClosedEvent::Reach can only happen before we are connected \ panic!("A TaskClosedEvent::Reach can only happen before we are connected \
to a node; therefore the TaskState won't be Connected; QED"); to a node; therefore the TaskState won't be Connected; QED");
}, },
@ -433,15 +449,20 @@ where
}, },
HandledNodesEvent::NodeEvent { task, event } => { HandledNodesEvent::NodeEvent { task, event } => {
let peer_id = match task.user_data() { let peer_id = match task.user_data() {
TaskState::Connected(peer_id) => peer_id.clone(), TaskState::Connected(peer_id, _) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \ _ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task; \ received a corresponding NodeReached event from that same task; \
when we receive a NodeReached event, we ensure that the entry in \ when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state; QED"), self.tasks is switched to the Connected state; QED"),
}; };
drop(task);
Async::Ready(CollectionEvent::NodeEvent { Async::Ready(CollectionEvent::NodeEvent {
peer_id, // TODO: normally we'd build a `PeerMut` manually here, but the borrow checker
// doesn't like it
peer: self.peer_mut(&peer_id)
.expect("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task;\
when that happens, peer_mut will always return Some; QED"),
event, event,
}) })
} }
@ -476,33 +497,60 @@ impl fmt::Display for InterruptError {
impl error::Error for InterruptError {} impl error::Error for InterruptError {}
/// Access to a peer in the collection. /// Access to a peer in the collection.
pub struct PeerMut<'a, TInEvent, TPeerId = PeerId> { pub struct PeerMut<'a, TInEvent, TUserData, TPeerId = PeerId> {
inner: HandledNodesTask<'a, TInEvent, TaskState<TPeerId>>, inner: HandledNodesTask<'a, TInEvent, TaskState<TPeerId, TUserData>>,
nodes: &'a mut FnvHashMap<TPeerId, TaskId>, nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
} }
impl<'a, TInEvent, TPeerId> PeerMut<'a, TInEvent, TPeerId> impl<'a, TInEvent, TUserData, TPeerId> PeerMut<'a, TInEvent, TUserData, TPeerId>
where where
TPeerId: Eq + Hash, TPeerId: Eq + Hash,
{ {
/// Returns the identifier of the peer.
pub fn id(&self) -> &TPeerId {
match self.inner.user_data() {
TaskState::Connected(peer_id, _) => peer_id,
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
state; QED")
}
}
/// Returns the user data that was stored in the collections when we accepted the connection.
pub fn user_data(&self) -> &TUserData {
match self.inner.user_data() {
TaskState::Connected(_, user_data) => user_data,
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
state; QED")
}
}
/// Returns the user data that was stored in the collections when we accepted the connection.
pub fn user_data_mut(&mut self) -> &mut TUserData {
match self.inner.user_data_mut() {
TaskState::Connected(_, user_data) => user_data,
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
state; QED")
}
}
/// Sends an event to the given node. /// Sends an event to the given node.
#[inline] #[inline]
pub fn send_event(&mut self, event: TInEvent) { pub fn send_event(&mut self, event: TInEvent) {
self.inner.send_event(event) self.inner.send_event(event)
} }
/// Closes the connections to this node. /// Closes the connections to this node. Returns the user data.
/// ///
/// No further event will be generated for this node. /// No further event will be generated for this node.
pub fn close(self) { pub fn close(self) -> TUserData {
if let TaskState::Connected(peer_id) = self.inner.user_data() { let task_id = self.inner.id();
if let TaskState::Connected(peer_id, user_data) = self.inner.close() {
let old_task_id = self.nodes.remove(&peer_id); let old_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(old_task_id, Some(self.inner.id())); debug_assert_eq!(old_task_id, Some(task_id));
user_data
} else { } else {
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \ panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
nodes always matched a Connected entry in the tasks; QED"); nodes always matched a Connected entry in the tasks; QED");
}; }
self.inner.close();
} }
} }

View File

@ -31,7 +31,7 @@ use crate::nodes::NodeHandlerEvent;
use std::{io, sync::Arc}; use std::{io, sync::Arc};
use parking_lot::Mutex; use parking_lot::Mutex;
type TestCollectionStream = CollectionStream<InEvent, OutEvent, Handler, io::Error, io::Error>; type TestCollectionStream = CollectionStream<InEvent, OutEvent, Handler, io::Error, io::Error, ()>;
#[test] #[test]
fn has_connection_is_false_before_a_connection_has_been_made() { fn has_connection_is_false_before_a_connection_has_been_made() {
@ -107,7 +107,7 @@ fn accepting_a_node_yields_new_entry() {
} }
2 => { 2 => {
assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
let (accept_ev, accepted_peer_id) = reach_ev.accept(); let (accept_ev, accepted_peer_id) = reach_ev.accept(());
assert_eq!(accepted_peer_id, peer_id); assert_eq!(accepted_peer_id, peer_id);
assert_matches!(accept_ev, CollectionNodeAccept::NewEntry); assert_matches!(accept_ev, CollectionNodeAccept::NewEntry);
}); });
@ -159,7 +159,7 @@ fn events_in_a_node_reaches_the_collection_stream() {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState); cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept(); reach_ev.accept(());
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
})).expect("tokio works"); })).expect("tokio works");
@ -168,7 +168,7 @@ fn events_in_a_node_reaches_the_collection_stream() {
rt.block_on(future::poll_fn(move || -> Poll<_, ()> { rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState); cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
assert_matches!(event, OutEvent::Custom("init")); assert_matches!(event, OutEvent::Custom("init"));
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -179,7 +179,7 @@ fn events_in_a_node_reaches_the_collection_stream() {
rt.block_on(future::poll_fn(move || -> Poll<_, ()> { rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState); cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
assert_matches!(event, OutEvent::Custom("from handler 1")); assert_matches!(event, OutEvent::Custom("from handler 1"));
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -189,7 +189,7 @@ fn events_in_a_node_reaches_the_collection_stream() {
rt.block_on(future::poll_fn(move || -> Poll<_, ()> { rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
cs.broadcast_event(&InEvent::NextState); cs.broadcast_event(&InEvent::NextState);
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer_id: _, event}) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeEvent{peer: _, event}) => {
assert_matches!(event, OutEvent::Custom("from handler 2")); assert_matches!(event, OutEvent::Custom("from handler 2"));
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -253,7 +253,7 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
// NodeReached, accept the connection so the task transitions from Pending to Connected // NodeReached, accept the connection so the task transitions from Pending to Connected
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept(); reach_ev.accept(());
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
})).expect("tokio works"); })).expect("tokio works");
@ -299,7 +299,7 @@ fn task_closed_ok_when_task_is_connected_yields_node_closed() {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
// NodeReached, accept the connection so the task transitions from Pending to Connected // NodeReached, accept the connection so the task transitions from Pending to Connected
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept(); reach_ev.accept(());
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
})).expect("tokio works"); })).expect("tokio works");
@ -312,7 +312,7 @@ fn task_closed_ok_when_task_is_connected_yields_node_closed() {
rt.block_on(future::poll_fn(move || -> Poll<_, ()> { rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
// Node is closed normally: TaskClosed, Ok(()) // Node is closed normally: TaskClosed, Ok(())
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event }) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event, .. }) => {
assert_eq!(peer_id_in_event, peer_id); assert_eq!(peer_id_in_event, peer_id);
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -363,7 +363,7 @@ fn interrupting_an_established_connection_is_err() {
let mut cs = cs_fut.lock(); let mut cs = cs_fut.lock();
// NodeReached, accept the connection so the task transitions from Pending to Connected // NodeReached, accept the connection so the task transitions from Pending to Connected
assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
reach_ev.accept(); reach_ev.accept(());
}); });
Ok(Async::Ready(())) Ok(Async::Ready(()))
})).expect("tokio works"); })).expect("tokio works");

View File

@ -369,11 +369,11 @@ impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> {
*self.inner.key() *self.inner.key()
} }
/// Closes the task. /// Closes the task. Returns the user data.
/// ///
/// No further event will be generated for this task. /// No further event will be generated for this task.
pub fn close(self) { pub fn close(self) -> TUserData {
self.inner.remove(); self.inner.remove().1
} }
} }

View File

@ -61,7 +61,7 @@ where
listeners: ListenersStream<TTrans>, listeners: ListenersStream<TTrans>,
/// The nodes currently active. /// The nodes currently active.
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>, active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
/// The reach attempts of the swarm. /// The reach attempts of the swarm.
/// This needs to be a separate struct in order to handle multiple mutable borrows issues. /// This needs to be a separate struct in order to handle multiple mutable borrows issues.
@ -507,7 +507,7 @@ where TTrans: Transport
/// Address used to send back data to the remote. /// Address used to send back data to the remote.
send_back_addr: Multiaddr, send_back_addr: Multiaddr,
/// Reference to the `active_nodes` field of the swarm. /// Reference to the `active_nodes` field of the swarm.
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>, active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
/// Reference to the `other_reach_attempts` field of the swarm. /// Reference to the `other_reach_attempts` field of the swarm.
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>, other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
} }
@ -998,6 +998,7 @@ where
Async::Ready(CollectionEvent::NodeError { Async::Ready(CollectionEvent::NodeError {
peer_id, peer_id,
error, error,
..
}) => { }) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id) let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \ .expect("We insert into connected_points whenever a connection is \
@ -1012,7 +1013,7 @@ where
error, error,
}; };
} }
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => { Async::Ready(CollectionEvent::NodeClosed { peer_id, .. }) => {
let endpoint = self.reach_attempts.connected_points.remove(&peer_id) let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
.expect("We insert into connected_points whenever a connection is \ .expect("We insert into connected_points whenever a connection is \
opened and remove only when a connection is closed; the \ opened and remove only when a connection is closed; the \
@ -1022,9 +1023,9 @@ where
action = Default::default(); action = Default::default();
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint }; out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
} }
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => { Async::Ready(CollectionEvent::NodeEvent { peer, event }) => {
action = Default::default(); action = Default::default();
out_event = RawSwarmEvent::NodeEvent { peer_id, event }; out_event = RawSwarmEvent::NodeEvent { peer_id: peer.id().clone(), event };
} }
} }
@ -1073,7 +1074,7 @@ impl<THandler, TPeerId> Default for ActionItem<THandler, TPeerId> {
/// > panics will likely happen. /// > panics will likely happen.
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>( fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>(
reach_attempts: &mut ReachAttempts<TPeerId>, reach_attempts: &mut ReachAttempts<TPeerId>,
event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>, event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>) ) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>)
where where
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone, TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
@ -1133,8 +1134,8 @@ where
} }
}; };
let (outcome, peer_id) = event.accept(); let (outcome, peer_id) = event.accept(());
if outcome == CollectionNodeAccept::ReplacedExisting { if let CollectionNodeAccept::ReplacedExisting(()) = outcome {
let closed_endpoint = closed_endpoint let closed_endpoint = closed_endpoint
.expect("We insert into connected_points whenever a connection is opened and \ .expect("We insert into connected_points whenever a connection is opened and \
remove only when a connection is closed; the underlying API is \ remove only when a connection is closed; the underlying API is \
@ -1171,8 +1172,8 @@ where
let closed_endpoint = reach_attempts.connected_points let closed_endpoint = reach_attempts.connected_points
.insert(event.peer_id().clone(), opened_endpoint.clone()); .insert(event.peer_id().clone(), opened_endpoint.clone());
let (outcome, peer_id) = event.accept(); let (outcome, peer_id) = event.accept(());
if outcome == CollectionNodeAccept::ReplacedExisting { if let CollectionNodeAccept::ReplacedExisting(()) = outcome {
let closed_endpoint = closed_endpoint let closed_endpoint = closed_endpoint
.expect("We insert into connected_points whenever a connection is opened and \ .expect("We insert into connected_points whenever a connection is opened and \
remove only when a connection is closed; the underlying API is guaranteed \ remove only when a connection is closed; the underlying API is guaranteed \
@ -1519,7 +1520,7 @@ pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr,
where TTrans: Transport, where TTrans: Transport,
{ {
/// Reference to the `active_nodes` of the parent. /// Reference to the `active_nodes` of the parent.
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>, active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
/// Reference to the `connected_points` field of the parent. /// Reference to the `connected_points` field of the parent.
connected_points: &'a mut FnvHashMap<TPeerId, ConnectedPoint>, connected_points: &'a mut FnvHashMap<TPeerId, ConnectedPoint>,
/// Reference to the `out_reach_attempts` field of the parent. /// Reference to the `out_reach_attempts` field of the parent.
@ -1576,7 +1577,7 @@ where
TTrans: Transport TTrans: Transport
{ {
attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>, attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>, active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
} }
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>