Make it possible to accept/deny nodes in CollectionsStream (#512)

* Provide poll() as a regular method when we can't fail

* Add a CollectionReachEvent that allows accepting

* Add small TODO in handled_node

* Implement Debug on collection events

* Implement Debug for CollectionStream

* Implement Debug for structs in handled_node_tasks

* Attempt to fix compilation errors with stable Rust

* Prove the unwrap
This commit is contained in:
Pierre Krieger
2018-10-01 14:49:17 +02:00
committed by GitHub
parent 4ff7d686a4
commit 7208bba92b
5 changed files with 692 additions and 489 deletions

View File

@ -25,15 +25,13 @@ use nodes::node::Substream;
use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks}; use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks};
use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId}; use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId};
use nodes::handled_node::NodeHandler; use nodes::handled_node::NodeHandler;
use std::collections::hash_map::Entry; use std::{collections::hash_map::Entry, fmt, mem};
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use void::Void;
use {Multiaddr, PeerId}; use {Multiaddr, PeerId};
// TODO: make generic over PeerId // TODO: make generic over PeerId
/// Implementation of `Stream` that handles a collection of nodes. /// Implementation of `Stream` that handles a collection of nodes.
// TODO: implement Debug
pub struct CollectionStream<TInEvent, TOutEvent> { pub struct CollectionStream<TInEvent, TOutEvent> {
/// Object that handles the tasks. /// Object that handles the tasks.
inner: HandledNodesTasks<TInEvent, TOutEvent>, inner: HandledNodesTasks<TInEvent, TOutEvent>,
@ -45,6 +43,23 @@ pub struct CollectionStream<TInEvent, TOutEvent> {
tasks: FnvHashMap<TaskId, TaskState>, tasks: FnvHashMap<TaskId, TaskState>,
} }
impl<TInEvent, TOutEvent> fmt::Debug for CollectionStream<TInEvent, TOutEvent> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let mut list = f.debug_list();
for (id, task) in &self.tasks {
match *task {
TaskState::Pending => {
list.entry(&format!("Pending({:?})", ReachAttemptId(*id)))
},
TaskState::Connected(ref peer_id) => {
list.entry(&format!("Connected({:?})", peer_id))
}
};
}
list.finish()
}
}
/// State of a task. /// State of a task.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState { enum TaskState {
@ -55,26 +70,10 @@ enum TaskState {
} }
/// Event that can happen on the `CollectionStream`. /// Event that can happen on the `CollectionStream`.
// TODO: implement Debug pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a> {
pub enum CollectionEvent<TOutEvent> { /// A connection to a node has succeeded. You must use the provided event in order to accept
/// A connection to a node has succeeded. /// the connection.
NodeReached { NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent>),
/// Identifier of the node.
peer_id: PeerId,
/// Identifier of the reach attempt that succeeded.
id: ReachAttemptId,
},
/// A connection to a node has succeeded and replaces a former connection.
///
/// The opened substreams of the former node will keep working (unless the remote decides to
/// close them).
NodeReplaced {
/// Identifier of the node.
peer_id: PeerId,
/// Identifier of the reach attempt that succeeded.
id: ReachAttemptId,
},
/// A connection to a node has been closed. /// A connection to a node has been closed.
/// ///
@ -110,6 +109,148 @@ pub enum CollectionEvent<TOutEvent> {
}, },
} }
impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent>
where TOutEvent: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
CollectionEvent::NodeReached(ref inner) => {
f.debug_tuple("CollectionEvent::NodeReached")
.field(inner)
.finish()
},
CollectionEvent::NodeClosed { ref peer_id } => {
f.debug_struct("CollectionEvent::NodeClosed")
.field("peer_id", peer_id)
.finish()
},
CollectionEvent::NodeError { ref peer_id, ref error } => {
f.debug_struct("CollectionEvent::NodeError")
.field("peer_id", peer_id)
.field("error", error)
.finish()
},
CollectionEvent::ReachError { ref id, ref error } => {
f.debug_struct("CollectionEvent::ReachError")
.field("id", id)
.field("error", error)
.finish()
},
CollectionEvent::NodeEvent { ref peer_id, ref event } => {
f.debug_struct("CollectionEvent::NodeEvent")
.field("peer_id", peer_id)
.field("event", event)
.finish()
},
}
}
}
/// Event that happens when we reach a node.
#[must_use = "The node reached event is used to accept the newly-opened connection"]
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a> {
/// Peer id we connected to.
peer_id: PeerId,
/// The task id that reached the node.
id: TaskId,
/// The `CollectionStream` we are referencing.
parent: &'a mut CollectionStream<TInEvent, TOutEvent>,
}
impl<'a, TInEvent, TOutEvent> CollectionReachEvent<'a, TInEvent, TOutEvent> {
/// Returns the peer id the node that has been reached.
#[inline]
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the reach attempt that reached the node.
#[inline]
pub fn reach_attempt_id(&self) -> ReachAttemptId {
ReachAttemptId(self.id)
}
/// Returns `true` if accepting this reached node would replace an existing connection to that
/// node.
#[inline]
pub fn would_replace(&self) -> bool {
self.parent.nodes.contains_key(&self.peer_id)
}
/// Accepts the new node.
pub fn accept(self) -> (CollectionNodeAccept, PeerId) {
// Set the state of the task to `Connected`.
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone()));
debug_assert_eq!(_former_state, Some(TaskState::Pending));
// It is possible that we already have a task connected to the same peer. In this
// case, we need to emit a `NodeReplaced` event.
let ret_value = if let Some(former_task_id) = former_task_id {
self.parent.inner.task(former_task_id)
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
corresponding entry from self.nodes ; therefore all elements in \
self.nodes are valid tasks in the HandledNodesTasks ; qed")
.close();
let _former_other_state = self.parent.tasks.remove(&former_task_id);
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
// TODO: we unfortunately have to clone the peer id here
(CollectionNodeAccept::ReplacedExisting, self.peer_id.clone())
} else {
// TODO: we unfortunately have to clone the peer id here
(CollectionNodeAccept::NewEntry, self.peer_id.clone())
};
// Don't run the destructor.
mem::forget(self);
ret_value
}
/// Denies the node.
///
/// Has the same effect as dropping the event without accepting it.
#[inline]
pub fn deny(self) -> PeerId {
// TODO: we unfortunately have to clone the id here, in order to be explicit
let peer_id = self.peer_id.clone();
drop(self); // Just to be explicit
peer_id
}
}
impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CollectionReachEvent")
.field("peer_id", &self.peer_id)
.field("reach_attempt_id", &self.reach_attempt_id())
.finish()
}
}
impl<'a, TInEvent, TOutEvent> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent> {
fn drop(&mut self) {
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
self.parent.inner.task(self.id)
.expect("we create the CollectionReachEvent with a valid task id ; the \
CollectionReachEvent mutably borrows the collection, therefore nothing \
can delete this task during the lifetime of the CollectionReachEvent ; \
therefore the task is still valid when we delete it ; qed")
.close();
}
}
/// Outcome of accepting a node.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CollectionNodeAccept {
/// We replaced an existing node.
ReplacedExisting,
/// We didn't replace anything existing.
NewEntry,
}
/// Identifier for a future that attempts to reach a node. /// Identifier for a future that attempts to reach a node.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReachAttemptId(TaskId); pub struct ReachAttemptId(TaskId);
@ -215,6 +356,81 @@ impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
pub fn connections(&self) -> impl Iterator<Item = &PeerId> { pub fn connections(&self) -> impl Iterator<Item = &PeerId> {
self.nodes.keys() self.nodes.keys()
} }
/// Provides an API similar to `Stream`, except that it cannot error.
///
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
/// > borrowed if necessary.
pub fn poll(&mut self) -> Async<Option<CollectionEvent<TInEvent, TOutEvent>>> {
let item = match self.inner.poll() {
Async::Ready(item) => item,
Async::NotReady => return Async::NotReady,
};
match item {
Some(HandledNodesEvent::TaskClosed { id, result }) => {
match (self.tasks.remove(&id), result) {
(Some(TaskState::Pending), Err(err)) => {
Async::Ready(Some(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
}))
},
(Some(TaskState::Pending), Ok(())) => {
// TODO: this variant shouldn't happen ; prove this
Async::Ready(Some(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: IoError::new(IoErrorKind::Other, "couldn't reach the node"),
}))
},
(Some(TaskState::Connected(peer_id)), Ok(())) => {
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(Some(CollectionEvent::NodeClosed {
peer_id,
}))
},
(Some(TaskState::Connected(peer_id)), Err(err)) => {
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(Some(CollectionEvent::NodeError {
peer_id,
error: err,
}))
},
(None, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed ; \
qed")
},
}
},
Some(HandledNodesEvent::NodeReached { id, peer_id }) => {
Async::Ready(Some(CollectionEvent::NodeReached(CollectionReachEvent {
parent: self,
id,
peer_id,
})))
},
Some(HandledNodesEvent::NodeEvent { id, event }) => {
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task ; \
when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state ; qed"),
};
Async::Ready(Some(CollectionEvent::NodeEvent {
peer_id,
event,
}))
}
None => Async::Ready(None),
}
}
} }
/// Access to a peer in the collection. /// Access to a peer in the collection.
@ -247,98 +463,3 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> {
self.inner.close(); self.inner.close();
} }
} }
impl<TInEvent, TOutEvent> Stream for CollectionStream<TInEvent, TOutEvent> {
type Item = CollectionEvent<TOutEvent>;
type Error = Void; // TODO: use ! once stable
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let item = try_ready!(self.inner.poll());
match item {
Some(HandledNodesEvent::TaskClosed { id, result }) => {
match (self.tasks.remove(&id), result) {
(Some(TaskState::Pending), Err(err)) => {
Ok(Async::Ready(Some(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
})))
},
(Some(TaskState::Pending), Ok(())) => {
// TODO: this variant shouldn't happen ; prove this
Ok(Async::Ready(Some(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: IoError::new(IoErrorKind::Other, "couldn't reach the node"),
})))
},
(Some(TaskState::Connected(peer_id)), Ok(())) => {
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Ok(Async::Ready(Some(CollectionEvent::NodeClosed {
peer_id,
})))
},
(Some(TaskState::Connected(peer_id)), Err(err)) => {
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Ok(Async::Ready(Some(CollectionEvent::NodeError {
peer_id,
error: err,
})))
},
(None, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed ; \
qed")
},
}
},
Some(HandledNodesEvent::NodeReached { id, peer_id }) => {
// Set the state of the task to `Connected`.
let former_task_id = self.nodes.insert(peer_id.clone(), id);
let _former_state = self.tasks.insert(id, TaskState::Connected(peer_id.clone()));
debug_assert_eq!(_former_state, Some(TaskState::Pending));
// It is possible that we already have a task connected to the same peer. In this
// case, we need to emit a `NodeReplaced` event.
if let Some(former_task_id) = former_task_id {
self.inner.task(former_task_id)
.expect("whenever we receive a TaskClosed event or close a node, we \
remove the corresponding entry from self.nodes ; therefore all \
elements in self.nodes are valid tasks in the \
HandledNodesTasks ; qed")
.close();
let _former_other_state = self.tasks.remove(&former_task_id);
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(peer_id.clone())));
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
peer_id,
id: ReachAttemptId(id),
})))
} else {
Ok(Async::Ready(Some(CollectionEvent::NodeReached {
peer_id,
id: ReachAttemptId(id),
})))
}
},
Some(HandledNodesEvent::NodeEvent { id, event }) => {
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
received a corresponding NodeReached event from that same task ; \
when we receive a NodeReached event, we ensure that the entry in \
self.tasks is switched to the Connected state ; qed"),
};
Ok(Async::Ready(Some(CollectionEvent::NodeEvent {
peer_id,
event,
})))
}
None => Ok(Async::Ready(None)),
}
}
}

View File

@ -28,6 +28,9 @@ use Multiaddr;
/// ///
/// > Note: When implementing the various methods, don't forget that you have to register the /// > Note: When implementing the various methods, don't forget that you have to register the
/// > task that was the latest to poll and notify it. /// > task that was the latest to poll and notify it.
// TODO: right now it is possible for a node handler to be built, then shut down right after if we
// realize we dialed the wrong peer for example ; this could be surprising and should either
// be documented or changed (favouring the "documented" right now)
pub trait NodeHandler<TSubstream> { pub trait NodeHandler<TSubstream> {
/// Custom event that can be received from the outside. /// Custom event that can be received from the outside.
type InEvent; type InEvent;

View File

@ -26,7 +26,7 @@ use nodes::handled_node::{HandledNode, NodeHandler};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::hash_map::{Entry, OccupiedEntry}; use std::collections::hash_map::{Entry, OccupiedEntry};
use std::io::Error as IoError; use std::io::Error as IoError;
use std::mem; use std::{fmt, mem};
use tokio_executor; use tokio_executor;
use void::Void; use void::Void;
use {Multiaddr, PeerId}; use {Multiaddr, PeerId};
@ -69,6 +69,14 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent> {
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent>, TaskId)>, events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent>, TaskId)>,
} }
impl<TInEvent, TOutEvent> fmt::Debug for HandledNodesTasks<TInEvent, TOutEvent> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_list()
.entries(self.tasks.keys().cloned())
.finish()
}
}
/// Event that can happen on the `HandledNodesTasks`. /// Event that can happen on the `HandledNodesTasks`.
#[derive(Debug)] #[derive(Debug)]
pub enum HandledNodesEvent<TOutEvent> { pub enum HandledNodesEvent<TOutEvent> {
@ -183,6 +191,55 @@ impl<TInEvent, TOutEvent> HandledNodesTasks<TInEvent, TOutEvent> {
pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a { pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a {
self.tasks.keys().cloned() self.tasks.keys().cloned()
} }
/// Provides an API similar to `Stream`, except that it cannot error.
pub fn poll(&mut self) -> Async<Option<HandledNodesEvent<TOutEvent>>> {
for to_spawn in self.to_spawn.drain() {
tokio_executor::spawn(to_spawn);
}
loop {
match self.events_rx.poll() {
Ok(Async::Ready(Some((message, task_id)))) => {
// If the task id is no longer in `self.tasks`, that means that the user called
// `close()` on this task earlier. Therefore no new event should be generated
// for this task.
if !self.tasks.contains_key(&task_id) {
continue;
};
match message {
InToExtMessage::NodeEvent(event) => {
break Async::Ready(Some(HandledNodesEvent::NodeEvent {
id: task_id,
event,
}));
},
InToExtMessage::NodeReached(peer_id) => {
break Async::Ready(Some(HandledNodesEvent::NodeReached {
id: task_id,
peer_id,
}));
},
InToExtMessage::TaskClosed(result) => {
let _ = self.tasks.remove(&task_id);
break Async::Ready(Some(HandledNodesEvent::TaskClosed {
id: task_id, result
}));
},
}
}
Ok(Async::NotReady) => {
break Async::NotReady;
}
Ok(Async::Ready(None)) => {
unreachable!("The sender is in self as well, therefore the receiver never \
closes.")
},
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
}
} }
/// Access to a task in the collection. /// Access to a task in the collection.
@ -214,60 +271,26 @@ impl<'a, TInEvent> Task<'a, TInEvent> {
} }
} }
impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_tuple("Task")
.field(&self.id())
.finish()
}
}
impl<TInEvent, TOutEvent> Stream for HandledNodesTasks<TInEvent, TOutEvent> { impl<TInEvent, TOutEvent> Stream for HandledNodesTasks<TInEvent, TOutEvent> {
type Item = HandledNodesEvent<TOutEvent>; type Item = HandledNodesEvent<TOutEvent>;
type Error = Void; // TODO: use ! once stable type Error = Void; // TODO: use ! once stable
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
for to_spawn in self.to_spawn.drain() { Ok(self.poll())
tokio_executor::spawn(to_spawn);
}
loop {
match self.events_rx.poll() {
Ok(Async::Ready(Some((message, task_id)))) => {
// If the task id is no longer in `self.tasks`, that means that the user called
// `close()` on this task earlier. Therefore no new event should be generated
// for this task.
if !self.tasks.contains_key(&task_id) {
continue;
};
match message {
InToExtMessage::NodeEvent(event) => {
break Ok(Async::Ready(Some(HandledNodesEvent::NodeEvent {
id: task_id,
event,
})));
},
InToExtMessage::NodeReached(peer_id) => {
break Ok(Async::Ready(Some(HandledNodesEvent::NodeReached {
id: task_id,
peer_id,
})));
},
InToExtMessage::TaskClosed(result) => {
let _ = self.tasks.remove(&task_id);
break Ok(Async::Ready(Some(HandledNodesEvent::TaskClosed {
id: task_id, result
})));
},
}
}
Ok(Async::NotReady) => {
break Ok(Async::NotReady);
}
Ok(Async::Ready(None)) => {
unreachable!("The sender is in self as well, therefore the receiver never \
closes.")
},
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
} }
} }
/// Message to transmit from a task to the public API. /// Message to transmit from a task to the public API.
#[derive(Debug)]
enum InToExtMessage<TOutEvent> { enum InToExtMessage<TOutEvent> {
/// A connection to a node has succeeded. /// A connection to a node has succeeded.
NodeReached(PeerId), NodeReached(PeerId),

View File

@ -126,16 +126,9 @@ where
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> { pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.iter().map(|l| &l.address) self.listeners.iter().map(|l| &l.address)
} }
}
impl<TTrans> Stream for ListenersStream<TTrans> /// Provides an API similar to `Stream`, except that it cannot error.
where pub fn poll(&mut self) -> Async<Option<ListenersEvent<TTrans>>> {
TTrans: Transport,
{
type Item = ListenersEvent<TTrans>;
type Error = Void; // TODO: use ! once stable
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// We remove each element from `listeners` one by one and add them back. // We remove each element from `listeners` one by one and add them back.
for n in (0..self.listeners.len()).rev() { for n in (0..self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n); let mut listener = self.listeners.swap_remove(n);
@ -146,30 +139,43 @@ where
Ok(Async::Ready(Some(upgrade))) => { Ok(Async::Ready(Some(upgrade))) => {
let listen_addr = listener.address.clone(); let listen_addr = listener.address.clone();
self.listeners.push(listener); self.listeners.push(listener);
return Ok(Async::Ready(Some(ListenersEvent::Incoming { return Async::Ready(Some(ListenersEvent::Incoming {
upgrade, upgrade,
listen_addr, listen_addr,
}))); }));
} }
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
return Ok(Async::Ready(Some(ListenersEvent::Closed { return Async::Ready(Some(ListenersEvent::Closed {
listen_addr: listener.address, listen_addr: listener.address,
listener: listener.listener, listener: listener.listener,
result: Ok(()), result: Ok(()),
}))); }));
} }
Err(err) => { Err(err) => {
return Ok(Async::Ready(Some(ListenersEvent::Closed { return Async::Ready(Some(ListenersEvent::Closed {
listen_addr: listener.address, listen_addr: listener.address,
listener: listener.listener, listener: listener.listener,
result: Err(err), result: Err(err),
}))); }));
} }
} }
} }
// We register the current task to be waken up if a new listener is added. // We register the current task to be waken up if a new listener is added.
Ok(Async::NotReady) Async::NotReady
}
}
impl<TTrans> Stream for ListenersStream<TTrans>
where
TTrans: Transport,
{
type Item = ListenersEvent<TTrans>;
type Error = Void; // TODO: use ! once stable
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(self.poll())
} }
} }

View File

@ -22,7 +22,7 @@ use fnv::FnvHashMap;
use futures::{prelude::*, future}; use futures::{prelude::*, future};
use muxing::StreamMuxer; use muxing::StreamMuxer;
use nodes::collection::{ use nodes::collection::{
CollectionEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId, CollectionEvent, CollectionNodeAccept, CollectionReachEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId,
}; };
use nodes::handled_node::NodeHandler; use nodes::handled_node::NodeHandler;
use nodes::listeners::{ListenersEvent, ListenersStream}; use nodes::listeners::{ListenersEvent, ListenersStream};
@ -43,6 +43,15 @@ where
/// The nodes currently active. /// The nodes currently active.
active_nodes: CollectionStream<TInEvent, TOutEvent>, active_nodes: CollectionStream<TInEvent, TOutEvent>,
/// The reach attempts of the swarm.
/// This needs to be a separate struct in order to handle multiple mutable borrows issues.
reach_attempts: ReachAttempts,
/// Object that builds new handlers.
handler_build: THandlerBuild,
}
struct ReachAttempts {
/// Attempts to reach a peer. /// Attempts to reach a peer.
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>, out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
@ -52,9 +61,6 @@ where
/// For each peer ID we're connected to, contains the multiaddress we're connected to. /// For each peer ID we're connected to, contains the multiaddress we're connected to.
connected_multiaddresses: FnvHashMap<PeerId, Multiaddr>, connected_multiaddresses: FnvHashMap<PeerId, Multiaddr>,
/// Object that builds new handlers.
handler_build: THandlerBuild,
} }
/// Attempt to reach a peer. /// Attempt to reach a peer.
@ -271,9 +277,11 @@ where
Swarm { Swarm {
listeners: ListenersStream::new(transport), listeners: ListenersStream::new(transport),
active_nodes: CollectionStream::new(), active_nodes: CollectionStream::new(),
out_reach_attempts: Default::default(), reach_attempts: ReachAttempts {
other_reach_attempts: Vec::new(), out_reach_attempts: Default::default(),
connected_multiaddresses: Default::default(), other_reach_attempts: Vec::new(),
connected_multiaddresses: Default::default(),
},
handler_build: Default::default, handler_build: Default::default,
} }
} }
@ -285,9 +293,11 @@ where
Swarm { Swarm {
listeners: ListenersStream::new(transport), listeners: ListenersStream::new(transport),
active_nodes: CollectionStream::new(), active_nodes: CollectionStream::new(),
out_reach_attempts: Default::default(), reach_attempts: ReachAttempts {
other_reach_attempts: Vec::new(), out_reach_attempts: Default::default(),
connected_multiaddresses: Default::default(), other_reach_attempts: Vec::new(),
connected_multiaddresses: Default::default(),
},
handler_build, handler_build,
} }
} }
@ -347,7 +357,7 @@ where
}; };
let reach_id = self.active_nodes.add_reach_attempt(future, self.handler_build.new_handler()); let reach_id = self.active_nodes.add_reach_attempt(future, self.handler_build.new_handler());
self.other_reach_attempts self.reach_attempts.other_reach_attempts
.push((reach_id, ConnectedPoint::Dialer { address: addr })); .push((reach_id, ConnectedPoint::Dialer { address: addr }));
Ok(()) Ok(())
} }
@ -361,7 +371,7 @@ where
// a lot of API changes // a lot of API changes
#[inline] #[inline]
pub fn num_incoming_negotiated(&self) -> usize { pub fn num_incoming_negotiated(&self) -> usize {
self.other_reach_attempts self.reach_attempts.other_reach_attempts
.iter() .iter()
.filter(|&(_, endpoint)| endpoint.is_listener()) .filter(|&(_, endpoint)| endpoint.is_listener())
.count() .count()
@ -382,21 +392,21 @@ where
// the borrow checker yells at us. // the borrow checker yells at us.
if self.active_nodes.peer_mut(&peer_id).is_some() { if self.active_nodes.peer_mut(&peer_id).is_some() {
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id)); debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
return Peer::Connected(PeerConnected { return Peer::Connected(PeerConnected {
peer: self peer: self
.active_nodes .active_nodes
.peer_mut(&peer_id) .peer_mut(&peer_id)
.expect("we checked for Some just above"), .expect("we checked for Some just above"),
peer_id, peer_id,
connected_multiaddresses: &mut self.connected_multiaddresses, connected_multiaddresses: &mut self.reach_attempts.connected_multiaddresses,
}); });
} }
if self.out_reach_attempts.get_mut(&peer_id).is_some() { if self.reach_attempts.out_reach_attempts.get_mut(&peer_id).is_some() {
debug_assert!(!self.connected_multiaddresses.contains_key(&peer_id)); debug_assert!(!self.reach_attempts.connected_multiaddresses.contains_key(&peer_id));
return Peer::PendingConnect(PeerPendingConnect { return Peer::PendingConnect(PeerPendingConnect {
attempt: match self.out_reach_attempts.entry(peer_id.clone()) { attempt: match self.reach_attempts.out_reach_attempts.entry(peer_id.clone()) {
Entry::Occupied(e) => e, Entry::Occupied(e) => e,
Entry::Vacant(_) => panic!("we checked for Some just above"), Entry::Vacant(_) => panic!("we checked for Some just above"),
}, },
@ -404,25 +414,19 @@ where
}); });
} }
debug_assert!(!self.connected_multiaddresses.contains_key(&peer_id)); debug_assert!(!self.reach_attempts.connected_multiaddresses.contains_key(&peer_id));
Peer::NotConnected(PeerNotConnected { Peer::NotConnected(PeerNotConnected {
nodes: self, nodes: self,
peer_id, peer_id,
}) })
} }
/// Handles a node reached event from the collection. /// Starts dialing out a multiaddress. `rest` is the list of multiaddresses to attempt if
/// `first` fails.
/// ///
/// Returns an event to return from the stream. /// It is a logic error to call this method if we already have an outgoing attempt to the
/// /// given peer.
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise fn start_dial_out(&mut self, peer_id: PeerId, first: Multiaddr, rest: Vec<Multiaddr>)
/// > panics will likely happen.
fn handle_node_reached(
&mut self,
peer_id: PeerId,
reach_id: ReachAttemptId,
replaced: bool,
) -> SwarmEvent<TTrans, TOutEvent>
where where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone, TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Dial: Send + 'static, TTrans::Dial: Send + 'static,
@ -433,217 +437,354 @@ where
TInEvent: Send + 'static, TInEvent: Send + 'static,
TOutEvent: Send + 'static, TOutEvent: Send + 'static,
{ {
// We first start looking in the incoming attempts. While this makes the code less optimal, let reach_id = match self.transport().clone().dial(first.clone()) {
// it also makes the logic easier. Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()),
if let Some(in_pos) = self Err((_, addr)) => {
.other_reach_attempts let msg = format!("unsupported multiaddr {}", addr);
.iter() let fut = future::err(IoError::new(IoErrorKind::Other, msg));
.position(|i| i.0 == reach_id) self.active_nodes.add_reach_attempt::<_, _, future::FutureResult<Multiaddr, IoError>, _>(fut, self.handler_build.new_handler())
{ },
let (_, endpoint) = self.other_reach_attempts.swap_remove(in_pos);
// Clear the known multiaddress for this peer.
let closed_multiaddr = self.connected_multiaddresses.remove(&peer_id);
// Cancel any outgoing attempt to this peer.
if let Some(attempt) = self.out_reach_attempts.remove(&peer_id) {
debug_assert_ne!(attempt.id, reach_id);
self.active_nodes
.interrupt(attempt.id)
.expect("We insert in out_reach_attempts only when we call \
active_nodes.add_reach_attempt, and we remove only when we call \
interrupt or when a reach attempt succeeds or errors. Therefore the \
out_reach_attempts should always be in sync with the actual attempts");
}
if replaced {
return SwarmEvent::Replaced {
peer_id,
endpoint,
closed_multiaddr,
};
} else {
return SwarmEvent::Connected { peer_id, endpoint };
}
}
// Otherwise, try for outgoing attempts.
let is_outgoing_and_ok = if let Some(attempt) = self.out_reach_attempts.get(&peer_id) {
attempt.id == reach_id
} else {
false
}; };
// We only remove the attempt from `out_reach_attempts` if it both matches the reach id let former = self.reach_attempts.out_reach_attempts.insert(
// and the expected peer id. peer_id,
if is_outgoing_and_ok { OutReachAttempt {
let attempt = self.out_reach_attempts.remove(&peer_id) id: reach_id,
.expect("is_outgoing_and_ok is true only if self.out_reach_attempts.get(&peer_id) \ cur_attempted: first,
returned Some"); next_attempts: rest,
},
);
let closed_multiaddr = self.connected_multiaddresses debug_assert!(former.is_none());
.insert(peer_id.clone(), attempt.cur_attempted.clone());
let endpoint = ConnectedPoint::Dialer {
address: attempt.cur_attempted,
};
if replaced {
return SwarmEvent::Replaced {
peer_id,
endpoint,
closed_multiaddr,
};
} else {
return SwarmEvent::Connected { peer_id, endpoint };
}
}
// If in neither, check outgoing reach attempts again as we may have a public
// key mismatch.
let expected_peer_id = self
.out_reach_attempts
.iter()
.find(|(_, a)| a.id == reach_id)
.map(|(p, _)| p.clone());
if let Some(expected_peer_id) = expected_peer_id {
let attempt = self.out_reach_attempts.remove(&expected_peer_id)
.expect("expected_peer_id is a key that is grabbed from out_reach_attempts");
let num_remain = attempt.next_attempts.len();
let failed_addr = attempt.cur_attempted.clone();
// Since the `peer_id` (the unexpected peer id) is now successfully connected, we have
// to drop it from active_nodes.
// TODO: at the moment, a peer id mismatch can drop a legitimate connection, which is
// why we have to purge `connected_multiaddresses`.
// See https://github.com/libp2p/rust-libp2p/issues/502
self.connected_multiaddresses.remove(&peer_id);
self.active_nodes.peer_mut(&peer_id)
.expect("When we receive a NodeReached or NodeReplaced event from active_nodes, \
it is guaranteed that the PeerId is valid and therefore that \
active_nodes.peer_mut succeeds with this ID. handle_node_reached is \
called only to handle these events.")
.close();
if !attempt.next_attempts.is_empty() {
let mut attempt = attempt;
attempt.cur_attempted = attempt.next_attempts.remove(0);
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()),
Err((_, addr)) => {
let msg = format!("unsupported multiaddr {}", addr);
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
self.active_nodes.add_reach_attempt::<_, _, future::FutureResult<Multiaddr, IoError>, _>(fut, self.handler_build.new_handler())
},
};
self.out_reach_attempts.insert(expected_peer_id.clone(), attempt);
}
return SwarmEvent::PublicKeyMismatch {
remain_addrs_attempt: num_remain,
expected_peer_id,
actual_peer_id: peer_id,
multiaddr: failed_addr,
};
}
// We didn't find any entry in neither the outgoing connections not ingoing connections.
panic!("The API of collection guarantees that the id sent back in NodeReached and \
NodeReplaced events (which is where we call handle_node_reached) is one that was \
passed to add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
at the same time an entry either in out_reach_attempts or in \
other_reach_attempts. It is therefore guaranteed that we find back this ID in \
either of these two sets");
} }
/// Handles a reach error event from the collection. /// Provides an API similar to `Stream`, except that it cannot error.
/// pub fn poll(&mut self) -> Async<Option<SwarmEvent<TTrans, TOutEvent>>>
/// Optionally returns an event to return from the stream.
///
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
/// > panics will likely happen.
fn handle_reach_error(
&mut self,
reach_id: ReachAttemptId,
error: IoError,
) -> Option<SwarmEvent<TTrans, TOutEvent>>
where where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone, TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Dial: Send + 'static, TTrans::Dial: Send + 'static,
TTrans::MultiaddrFuture: Send + 'static, TTrans::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send, TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send, TMuxer::Substream: Send,
TInEvent: Send + 'static, TInEvent: Send + 'static,
TOutEvent: Send + 'static, TOutEvent: Send + 'static,
THandlerBuild: HandlerFactory<Handler = THandler>,
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
{ {
// Search for the attempt in `out_reach_attempts`. // Start by polling the listeners for events.
// TODO: could be more optimal than iterating over everything match self.listeners.poll() {
let out_reach_peer_id = self Async::NotReady => (),
.out_reach_attempts Async::Ready(Some(ListenersEvent::Incoming {
.iter() upgrade,
.find(|(_, a)| a.id == reach_id) listen_addr,
.map(|(p, _)| p.clone()); })) => {
if let Some(peer_id) = out_reach_peer_id { let id = self.active_nodes.add_reach_attempt(upgrade, self.handler_build.new_handler());
let mut attempt = self.out_reach_attempts.remove(&peer_id) self.reach_attempts.other_reach_attempts.push((
.expect("out_reach_peer_id is a key that is grabbed from out_reach_attempts"); id,
ConnectedPoint::Listener {
let num_remain = attempt.next_attempts.len(); listen_addr: listen_addr.clone(),
let failed_addr = attempt.cur_attempted.clone();
if !attempt.next_attempts.is_empty() {
let mut attempt = attempt;
attempt.cur_attempted = attempt.next_attempts.remove(0);
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()),
Err((_, addr)) => {
let msg = format!("unsupported multiaddr {}", addr);
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
self.active_nodes.add_reach_attempt::<_, _, future::FutureResult<Multiaddr, IoError>, _>(fut, self.handler_build.new_handler())
}, },
}; ));
return Async::Ready(Some(SwarmEvent::IncomingConnection {
self.out_reach_attempts.insert(peer_id.clone(), attempt); listen_addr,
}));
} }
Async::Ready(Some(ListenersEvent::Closed {
return Some(SwarmEvent::DialError { listen_addr,
remain_addrs_attempt: num_remain, listener,
peer_id, result,
multiaddr: failed_addr, })) => {
error, return Async::Ready(Some(SwarmEvent::ListenerClosed {
}); listen_addr,
listener,
result,
}));
}
Async::Ready(None) => unreachable!("The listeners stream never finishes"),
} }
// If this is not an outgoing reach attempt, check the incoming reach attempts. // Poll the existing nodes.
if let Some(in_pos) = self loop {
.other_reach_attempts let (action, out_event);
.iter() match self.active_nodes.poll() {
.position(|i| i.0 == reach_id) Async::NotReady => break,
{ Async::Ready(Some(CollectionEvent::NodeReached(reach_event))) => {
let (_, endpoint) = self.other_reach_attempts.swap_remove(in_pos); let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event);
match endpoint { action = a;
ConnectedPoint::Dialer { address } => { out_event = e;
return Some(SwarmEvent::UnknownPeerDialError { }
multiaddr: address, Async::Ready(Some(CollectionEvent::ReachError { id, error })) => {
let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error);
action = a;
out_event = e;
}
Async::Ready(Some(CollectionEvent::NodeError {
peer_id,
error,
})) => {
let address = self.reach_attempts.connected_multiaddresses.remove(&peer_id);
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = SwarmEvent::NodeError {
peer_id,
address,
error, error,
}); };
} }
ConnectedPoint::Listener { listen_addr } => { Async::Ready(Some(CollectionEvent::NodeClosed { peer_id })) => {
return Some(SwarmEvent::IncomingConnectionError { listen_addr, error }); let address = self.reach_attempts.connected_multiaddresses.remove(&peer_id);
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
action = Default::default();
out_event = SwarmEvent::NodeClosed { peer_id, address };
} }
Async::Ready(Some(CollectionEvent::NodeEvent { peer_id, event })) => {
action = Default::default();
out_event = SwarmEvent::NodeEvent { peer_id, event };
}
Async::Ready(None) => unreachable!("CollectionStream never ends"),
};
if let Some((peer_id, first, rest)) = action.start_dial_out {
self.start_dial_out(peer_id, first, rest);
} }
if let Some(interrupt) = action.interrupt {
// TODO: improve proof or remove ; this is too complicated right now
self.active_nodes
.interrupt(interrupt)
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts` ;
we insert in out_reach_attempts only when we call \
active_nodes.add_reach_attempt, and we remove only when we call \
interrupt or when a reach attempt succeeds or errors ; therefore the \
out_reach_attempts should always be in sync with the actual \
attempts ; qed");
}
return Async::Ready(Some(out_event));
} }
// The id was neither in the outbound list nor the inbound list. Async::NotReady
panic!("The API of collection guarantees that the id sent back in ReachError events \
(which is where we call handle_reach_error) is one that was passed to \
add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
at the same time an entry either in out_reach_attempts or in \
other_reach_attempts. It is therefore guaranteed that we find back this ID in \
either of these two sets");
} }
} }
/// Internal struct indicating an action to perform of the swarm.
#[derive(Debug, Default)]
#[must_use]
struct ActionItem {
start_dial_out: Option<(PeerId, Multiaddr, Vec<Multiaddr>)>,
interrupt: Option<ReachAttemptId>,
}
/// Handles a node reached event from the collection.
///
/// Returns an event to return from the stream.
///
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
/// > panics will likely happen.
fn handle_node_reached<TTrans, TMuxer, TInEvent, TOutEvent>(
reach_attempts: &mut ReachAttempts,
event: CollectionReachEvent<TInEvent, TOutEvent>
) -> (ActionItem, SwarmEvent<TTrans, TOutEvent>)
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Dial: Send + 'static,
TTrans::MultiaddrFuture: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// We first start looking in the incoming attempts. While this makes the code less optimal,
// it also makes the logic easier.
if let Some(in_pos) = reach_attempts
.other_reach_attempts
.iter()
.position(|i| i.0 == event.reach_attempt_id())
{
let (_, endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos);
// Clear the known multiaddress for this peer.
let closed_multiaddr = reach_attempts.connected_multiaddresses.remove(&event.peer_id());
// Cancel any outgoing attempt to this peer.
let action = if let Some(attempt) = reach_attempts.out_reach_attempts.remove(&event.peer_id()) {
debug_assert_ne!(attempt.id, event.reach_attempt_id());
ActionItem {
interrupt: Some(attempt.id),
.. Default::default()
}
} else {
ActionItem::default()
};
let (outcome, peer_id) = event.accept();
if outcome == CollectionNodeAccept::ReplacedExisting {
return (action, SwarmEvent::Replaced {
peer_id,
endpoint,
closed_multiaddr,
});
} else {
return (action, SwarmEvent::Connected { peer_id, endpoint });
}
}
// Otherwise, try for outgoing attempts.
let is_outgoing_and_ok = if let Some(attempt) = reach_attempts.out_reach_attempts.get(event.peer_id()) {
attempt.id == event.reach_attempt_id()
} else {
false
};
// We only remove the attempt from `out_reach_attempts` if it both matches the reach id
// and the expected peer id.
if is_outgoing_and_ok {
let attempt = reach_attempts.out_reach_attempts.remove(event.peer_id())
.expect("is_outgoing_and_ok is true only if reach_attempts.out_reach_attempts.get(event.peer_id()) \
returned Some");
let closed_multiaddr = reach_attempts.connected_multiaddresses
.insert(event.peer_id().clone(), attempt.cur_attempted.clone());
let endpoint = ConnectedPoint::Dialer {
address: attempt.cur_attempted,
};
let (outcome, peer_id) = event.accept();
if outcome == CollectionNodeAccept::ReplacedExisting {
return (Default::default(), SwarmEvent::Replaced {
peer_id,
endpoint,
closed_multiaddr,
});
} else {
return (Default::default(), SwarmEvent::Connected { peer_id, endpoint });
}
}
// If in neither, check outgoing reach attempts again as we may have a public
// key mismatch.
let expected_peer_id = reach_attempts
.out_reach_attempts
.iter()
.find(|(_, a)| a.id == event.reach_attempt_id())
.map(|(p, _)| p.clone());
if let Some(expected_peer_id) = expected_peer_id {
debug_assert_ne!(&expected_peer_id, event.peer_id());
let attempt = reach_attempts.out_reach_attempts.remove(&expected_peer_id)
.expect("expected_peer_id is a key that is grabbed from out_reach_attempts");
let num_remain = attempt.next_attempts.len();
let failed_addr = attempt.cur_attempted.clone();
let peer_id = event.deny();
let action = if !attempt.next_attempts.is_empty() {
let mut attempt = attempt;
let next = attempt.next_attempts.remove(0);
ActionItem {
start_dial_out: Some((expected_peer_id.clone(), next, attempt.next_attempts)),
.. Default::default()
}
} else {
Default::default()
};
return (action, SwarmEvent::PublicKeyMismatch {
remain_addrs_attempt: num_remain,
expected_peer_id,
actual_peer_id: peer_id,
multiaddr: failed_addr,
});
}
// We didn't find any entry in neither the outgoing connections not ingoing connections.
// TODO: improve proof or remove ; this is too complicated right now
panic!("The API of collection guarantees that the id sent back in NodeReached (which is where \
we call handle_node_reached) is one that was passed to add_reach_attempt. Whenever we \
call add_reach_attempt, we also insert at the same time an entry either in \
out_reach_attempts or in other_reach_attempts. It is therefore guaranteed that we \
find back this ID in either of these two sets");
}
/// Handles a reach error event from the collection.
///
/// Optionally returns an event to return from the stream.
///
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
/// > panics will likely happen.
fn handle_reach_error<TTrans, TOutEvent>(
reach_attempts: &mut ReachAttempts,
reach_id: ReachAttemptId,
error: IoError,
) -> (ActionItem, SwarmEvent<TTrans, TOutEvent>)
where TTrans: Transport
{
// Search for the attempt in `out_reach_attempts`.
// TODO: could be more optimal than iterating over everything
let out_reach_peer_id = reach_attempts
.out_reach_attempts
.iter()
.find(|(_, a)| a.id == reach_id)
.map(|(p, _)| p.clone());
if let Some(peer_id) = out_reach_peer_id {
let mut attempt = reach_attempts.out_reach_attempts.remove(&peer_id)
.expect("out_reach_peer_id is a key that is grabbed from out_reach_attempts");
let num_remain = attempt.next_attempts.len();
let failed_addr = attempt.cur_attempted.clone();
let action = if !attempt.next_attempts.is_empty() {
let mut attempt = attempt;
let next_attempt = attempt.next_attempts.remove(0);
ActionItem {
start_dial_out: Some((peer_id.clone(), next_attempt, attempt.next_attempts)),
.. Default::default()
}
} else {
Default::default()
};
return (action, SwarmEvent::DialError {
remain_addrs_attempt: num_remain,
peer_id,
multiaddr: failed_addr,
error,
});
}
// If this is not an outgoing reach attempt, check the incoming reach attempts.
if let Some(in_pos) = reach_attempts
.other_reach_attempts
.iter()
.position(|i| i.0 == reach_id)
{
let (_, endpoint) = reach_attempts.other_reach_attempts.swap_remove(in_pos);
match endpoint {
ConnectedPoint::Dialer { address } => {
return (Default::default(), SwarmEvent::UnknownPeerDialError {
multiaddr: address,
error,
});
}
ConnectedPoint::Listener { listen_addr } => {
return (Default::default(), SwarmEvent::IncomingConnectionError { listen_addr, error });
}
}
}
// The id was neither in the outbound list nor the inbound list.
// TODO: improve proof or remove ; this is too complicated right now
panic!("The API of collection guarantees that the id sent back in ReachError events \
(which is where we call handle_reach_error) is one that was passed to \
add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
at the same time an entry either in out_reach_attempts or in \
other_reach_attempts. It is therefore guaranteed that we find back this ID in \
either of these two sets");
}
/// State of a peer in the system. /// State of a peer in the system.
pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a> pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a>
where where
@ -838,6 +979,7 @@ impl<'a, TInEvent, TOutEvent> PeerPendingConnect<'a, TInEvent, TOutEvent> {
pub fn interrupt(self) { pub fn interrupt(self) {
let attempt = self.attempt.remove(); let attempt = self.attempt.remove();
if let Err(_) = self.active_nodes.interrupt(attempt.id) { if let Err(_) = self.active_nodes.interrupt(attempt.id) {
// TODO: improve proof or remove ; this is too complicated right now
panic!("We retreived this attempt.id from out_reach_attempts. We insert in \ panic!("We retreived this attempt.id from out_reach_attempts. We insert in \
out_reach_attempts only at the same time as we call add_reach_attempt. \ out_reach_attempts only at the same time as we call add_reach_attempt. \
Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \ Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \
@ -947,25 +1089,10 @@ where
TInEvent: Send + 'static, TInEvent: Send + 'static,
TOutEvent: Send + 'static, TOutEvent: Send + 'static,
{ {
let future = match self.nodes.transport().clone().dial(first.clone()) { self.nodes.start_dial_out(self.peer_id.clone(), first, rest);
Ok(fut) => fut,
Err(_) => return Err(self),
};
let reach_id = self.nodes.active_nodes.add_reach_attempt(future, self.nodes.handler_build.new_handler());
let former = self.nodes.out_reach_attempts.insert(
self.peer_id.clone(),
OutReachAttempt {
id: reach_id,
cur_attempted: first,
next_attempts: rest,
},
);
debug_assert!(former.is_none());
Ok(PeerPendingConnect { Ok(PeerPendingConnect {
attempt: match self.nodes.out_reach_attempts.entry(self.peer_id) { attempt: match self.nodes.reach_attempts.out_reach_attempts.entry(self.peer_id) {
Entry::Occupied(e) => e, Entry::Occupied(e) => e,
Entry::Vacant(_) => { Entry::Vacant(_) => {
panic!("We called out_reach_attempts.insert with this peer id just above") panic!("We called out_reach_attempts.insert with this peer id just above")
@ -995,85 +1122,8 @@ where
type Item = SwarmEvent<TTrans, TOutEvent>; type Item = SwarmEvent<TTrans, TOutEvent>;
type Error = Void; // TODO: use `!` once stable type Error = Void; // TODO: use `!` once stable
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Start by polling the listeners for events. Ok(self.poll())
match self.listeners.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(Some(ListenersEvent::Incoming {
upgrade,
listen_addr,
}))) => {
let id = self.active_nodes.add_reach_attempt(upgrade, self.handler_build.new_handler());
self.other_reach_attempts.push((
id,
ConnectedPoint::Listener {
listen_addr: listen_addr.clone(),
},
));
return Ok(Async::Ready(Some(SwarmEvent::IncomingConnection {
listen_addr,
})));
}
Ok(Async::Ready(Some(ListenersEvent::Closed {
listen_addr,
listener,
result,
}))) => {
return Ok(Async::Ready(Some(SwarmEvent::ListenerClosed {
listen_addr,
listener,
result,
})));
}
Ok(Async::Ready(None)) => unreachable!("The listeners stream never finishes"),
Err(_) => unreachable!("The listeners stream never errors"), // TODO: remove variant
}
// Poll the existing nodes.
loop {
match self.active_nodes.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(CollectionEvent::NodeReached { peer_id, id }))) => {
let event = self.handle_node_reached(peer_id, id, false);
return Ok(Async::Ready(Some(event)));
}
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
peer_id,
id,
}))) => {
let event = self.handle_node_reached(peer_id, id, true);
return Ok(Async::Ready(Some(event)));
}
Ok(Async::Ready(Some(CollectionEvent::ReachError { id, error }))) => {
if let Some(event) = self.handle_reach_error(id, error) {
return Ok(Async::Ready(Some(event)));
}
}
Ok(Async::Ready(Some(CollectionEvent::NodeError {
peer_id,
error,
}))) => {
let address = self.connected_multiaddresses.remove(&peer_id);
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
return Ok(Async::Ready(Some(SwarmEvent::NodeError {
peer_id,
address,
error,
})));
}
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) => {
let address = self.connected_multiaddresses.remove(&peer_id);
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed { peer_id, address })));
}
Ok(Async::Ready(Some(CollectionEvent::NodeEvent { peer_id, event }))) => {
return Ok(Async::Ready(Some(SwarmEvent::NodeEvent { peer_id, event })));
}
Ok(Async::Ready(None)) => unreachable!("CollectionStream never ends"),
Err(_) => unreachable!("CollectionStream never errors"),
}
}
Ok(Async::NotReady)
} }
} }