mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-12 01:21:21 +00:00
Add a NodeHandler trait (#495)
* Add a NodeHandler trait * Fix compilation * Some fixes
This commit is contained in:
@ -21,7 +21,8 @@
|
|||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use futures::{prelude::*, sync::mpsc, sync::oneshot, task};
|
use futures::{prelude::*, sync::mpsc, sync::oneshot, task};
|
||||||
use muxing::StreamMuxer;
|
use muxing::StreamMuxer;
|
||||||
use nodes::node::{NodeEvent, NodeStream, Substream};
|
use nodes::node::Substream;
|
||||||
|
use nodes::handled_node::{HandledNode, NodeHandler};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
@ -49,12 +50,9 @@ use {Multiaddr, PeerId};
|
|||||||
|
|
||||||
/// Implementation of `Stream` that handles a collection of nodes.
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
// TODO: implement Debug
|
// TODO: implement Debug
|
||||||
pub struct CollectionStream<TMuxer, TUserData>
|
pub struct CollectionStream<TInEvent, TOutEvent> {
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
{
|
|
||||||
/// List of nodes, with a sender allowing to communicate messages.
|
/// List of nodes, with a sender allowing to communicate messages.
|
||||||
nodes: FnvHashMap<PeerId, (ReachAttemptId, mpsc::UnboundedSender<ExtToInMessage>)>,
|
nodes: FnvHashMap<PeerId, (ReachAttemptId, mpsc::UnboundedSender<TInEvent>)>,
|
||||||
/// Known state of a task. Tasks are identified by the reach attempt ID.
|
/// Known state of a task. Tasks are identified by the reach attempt ID.
|
||||||
tasks: FnvHashMap<ReachAttemptId, TaskKnownState>,
|
tasks: FnvHashMap<ReachAttemptId, TaskKnownState>,
|
||||||
/// Identifier for the next task to spawn.
|
/// Identifier for the next task to spawn.
|
||||||
@ -67,18 +65,9 @@ where
|
|||||||
to_notify: Option<task::Task>,
|
to_notify: Option<task::Task>,
|
||||||
|
|
||||||
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TMuxer>, ReachAttemptId)>,
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TInEvent, TOutEvent>, ReachAttemptId)>,
|
||||||
/// Receiver side for the events.
|
/// Receiver side for the events.
|
||||||
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TMuxer>, ReachAttemptId)>,
|
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TInEvent, TOutEvent>, ReachAttemptId)>,
|
||||||
|
|
||||||
/// Instead of passing directly the user data when opening an outbound substream attempt, we
|
|
||||||
/// store it here and pass a `usize` to the node. This makes it possible to instantly close
|
|
||||||
/// some attempts if necessary.
|
|
||||||
// TODO: use something else than hashmap? we often need to iterate over everything, and a
|
|
||||||
// SmallVec may be better
|
|
||||||
outbound_attempts: FnvHashMap<usize, (PeerId, TUserData)>,
|
|
||||||
/// Identifier for the next entry in `outbound_attempts`.
|
|
||||||
next_outbound_attempt: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// State of a task, as known by the frontend (the `ColletionStream`). Asynchronous compared to
|
/// State of a task, as known by the frontend (the `ColletionStream`). Asynchronous compared to
|
||||||
@ -106,10 +95,7 @@ impl TaskKnownState {
|
|||||||
|
|
||||||
/// Event that can happen on the `CollectionStream`.
|
/// Event that can happen on the `CollectionStream`.
|
||||||
// TODO: implement Debug
|
// TODO: implement Debug
|
||||||
pub enum CollectionEvent<TMuxer, TUserData>
|
pub enum CollectionEvent<TOutEvent> {
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
{
|
|
||||||
/// A connection to a node has succeeded.
|
/// A connection to a node has succeeded.
|
||||||
NodeReached {
|
NodeReached {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
@ -125,8 +111,6 @@ where
|
|||||||
NodeReplaced {
|
NodeReplaced {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
/// Outbound substream attempts that have been closed in the process.
|
|
||||||
closed_outbound_substreams: Vec<TUserData>,
|
|
||||||
/// Identifier of the reach attempt that succeeded.
|
/// Identifier of the reach attempt that succeeded.
|
||||||
id: ReachAttemptId,
|
id: ReachAttemptId,
|
||||||
},
|
},
|
||||||
@ -146,8 +130,6 @@ where
|
|||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: IoError,
|
error: IoError,
|
||||||
/// Pending outbound substreams that were cancelled.
|
|
||||||
closed_outbound_substreams: Vec<TUserData>,
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/// An error happened on the future that was trying to reach a node.
|
/// An error happened on the future that was trying to reach a node.
|
||||||
@ -158,46 +140,12 @@ where
|
|||||||
error: IoError,
|
error: IoError,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// The multiaddress of the node has been resolved.
|
/// A node has produced an event.
|
||||||
NodeMultiaddr {
|
NodeEvent {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
/// Address that has been resolved, or error that occured on the substream.
|
/// The produced event.
|
||||||
address: Result<Multiaddr, IoError>,
|
event: TOutEvent,
|
||||||
},
|
|
||||||
|
|
||||||
/// A new inbound substream arrived.
|
|
||||||
InboundSubstream {
|
|
||||||
/// Identifier of the node.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// The newly-opened substream.
|
|
||||||
substream: Substream<TMuxer>,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// An outbound substream has successfully been opened.
|
|
||||||
OutboundSubstream {
|
|
||||||
/// Identifier of the node.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// Identifier of the substream. Same as what was returned by `open_substream`.
|
|
||||||
user_data: TUserData,
|
|
||||||
/// The newly-opened substream.
|
|
||||||
substream: Substream<TMuxer>,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// The inbound side of a muxer has been gracefully closed. No more inbound substreams will
|
|
||||||
/// be produced.
|
|
||||||
InboundClosed {
|
|
||||||
/// Identifier of the node.
|
|
||||||
peer_id: PeerId,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// An outbound substream couldn't be opened because the muxer is no longer capable of opening
|
|
||||||
/// more substreams.
|
|
||||||
OutboundClosed {
|
|
||||||
/// Identifier of the node.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// Identifier of the substream. Same as what was returned by `open_substream`.
|
|
||||||
user_data: TUserData,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,10 +153,7 @@ where
|
|||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct ReachAttemptId(usize);
|
pub struct ReachAttemptId(usize);
|
||||||
|
|
||||||
impl<TMuxer, TUserData> CollectionStream<TMuxer, TUserData>
|
impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
{
|
|
||||||
/// Creates a new empty collection.
|
/// Creates a new empty collection.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -222,8 +167,6 @@ where
|
|||||||
to_notify: None,
|
to_notify: None,
|
||||||
events_tx,
|
events_tx,
|
||||||
events_rx,
|
events_rx,
|
||||||
outbound_attempts: Default::default(),
|
|
||||||
next_outbound_attempt: 0,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,14 +174,17 @@ where
|
|||||||
///
|
///
|
||||||
/// This method spawns a task dedicated to resolving this future and processing the node's
|
/// This method spawns a task dedicated to resolving this future and processing the node's
|
||||||
/// events.
|
/// events.
|
||||||
pub fn add_reach_attempt<TFut, TAddrFut>(&mut self, future: TFut) -> ReachAttemptId
|
pub fn add_reach_attempt<TFut, TMuxer, TAddrFut, THandler>(&mut self, future: TFut, handler: THandler)
|
||||||
|
-> ReachAttemptId
|
||||||
where
|
where
|
||||||
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError> + Send + 'static,
|
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError> + Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
|
||||||
TMuxer::OutboundSubstream: Send,
|
|
||||||
TMuxer::Substream: Send,
|
|
||||||
TAddrFut: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
TAddrFut: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
||||||
TUserData: Send + 'static,
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
|
||||||
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
|
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
||||||
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
{
|
{
|
||||||
let reach_attempt_id = self.next_task_id;
|
let reach_attempt_id = self.next_task_id;
|
||||||
self.next_task_id.0 += 1;
|
self.next_task_id.0 += 1;
|
||||||
@ -255,6 +201,7 @@ where
|
|||||||
inner: NodeTaskInner::Future {
|
inner: NodeTaskInner::Future {
|
||||||
future,
|
future,
|
||||||
interrupt: interrupt_rx,
|
interrupt: interrupt_rx,
|
||||||
|
handler: Some(handler),
|
||||||
},
|
},
|
||||||
events_tx: self.events_tx.clone(),
|
events_tx: self.events_tx.clone(),
|
||||||
id: reach_attempt_id,
|
id: reach_attempt_id,
|
||||||
@ -294,20 +241,24 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sends an event to all nodes.
|
||||||
|
pub fn broadcast_event(&mut self, event: &TInEvent)
|
||||||
|
where TInEvent: Clone,
|
||||||
|
{
|
||||||
|
for &(_, ref sender) in self.nodes.values() {
|
||||||
|
let _ = sender.unbounded_send(event.clone()); // TODO: unwrap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Grants access to an object that allows controlling a node of the collection.
|
/// Grants access to an object that allows controlling a node of the collection.
|
||||||
///
|
///
|
||||||
/// Returns `None` if we don't have a connection to this peer.
|
/// Returns `None` if we don't have a connection to this peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TUserData>>
|
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TInEvent>> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
{
|
|
||||||
match self.nodes.entry(id.clone()) {
|
match self.nodes.entry(id.clone()) {
|
||||||
Entry::Occupied(inner) => Some(PeerMut {
|
Entry::Occupied(inner) => Some(PeerMut {
|
||||||
inner,
|
inner,
|
||||||
tasks: &mut self.tasks,
|
tasks: &mut self.tasks,
|
||||||
next_outbound_attempt: &mut self.next_outbound_attempt,
|
|
||||||
outbound_attempts: &mut self.outbound_attempts,
|
|
||||||
}),
|
}),
|
||||||
Entry::Vacant(_) => None,
|
Entry::Vacant(_) => None,
|
||||||
}
|
}
|
||||||
@ -331,43 +282,25 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer in the collection.
|
/// Access to a peer in the collection.
|
||||||
pub struct PeerMut<'a, TUserData>
|
pub struct PeerMut<'a, TInEvent: 'a> {
|
||||||
where
|
inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender<TInEvent>)>,
|
||||||
TUserData: Send + 'static,
|
|
||||||
{
|
|
||||||
next_outbound_attempt: &'a mut usize,
|
|
||||||
outbound_attempts: &'a mut FnvHashMap<usize, (PeerId, TUserData)>,
|
|
||||||
inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender<ExtToInMessage>)>,
|
|
||||||
tasks: &'a mut FnvHashMap<ReachAttemptId, TaskKnownState>,
|
tasks: &'a mut FnvHashMap<ReachAttemptId, TaskKnownState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TUserData> PeerMut<'a, TUserData>
|
impl<'a, TInEvent> PeerMut<'a, TInEvent> {
|
||||||
where
|
/// Sends an event to the given node.
|
||||||
TUserData: Send + 'static,
|
#[inline]
|
||||||
{
|
pub fn send_event(&mut self, event: TInEvent) {
|
||||||
/// Starts the process of opening a new outbound substream towards the peer.
|
// It is possible that the sender is closed if the task has already finished but we
|
||||||
pub fn open_substream(&mut self, user_data: TUserData) {
|
// haven't been polled in the meanwhile.
|
||||||
let id = *self.next_outbound_attempt;
|
let _ = self.inner.get_mut().1.unbounded_send(event);
|
||||||
*self.next_outbound_attempt += 1;
|
|
||||||
|
|
||||||
self.outbound_attempts
|
|
||||||
.insert(id, (self.inner.key().clone(), user_data));
|
|
||||||
|
|
||||||
let _ = self
|
|
||||||
.inner
|
|
||||||
.get_mut()
|
|
||||||
.1
|
|
||||||
.unbounded_send(ExtToInMessage::OpenSubstream(id));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Closes the connections to this node.
|
/// Closes the connections to this node.
|
||||||
///
|
///
|
||||||
/// This cancels all the attempted outgoing substream attempts, and returns them.
|
/// No further event will be generated for this node.
|
||||||
///
|
pub fn close(self) {
|
||||||
/// No event will be generated for this node.
|
|
||||||
pub fn close(self) -> Vec<TUserData> {
|
|
||||||
let (peer_id, (task_id, _)) = self.inner.remove_entry();
|
let (peer_id, (task_id, _)) = self.inner.remove_entry();
|
||||||
let user_datas = extract_from_attempt(self.outbound_attempts, &peer_id);
|
|
||||||
// Set the task to `Interrupted` so that we ignore further messages from this closed node.
|
// Set the task to `Interrupted` so that we ignore further messages from this closed node.
|
||||||
match self.tasks.insert(task_id, TaskKnownState::Interrupted) {
|
match self.tasks.insert(task_id, TaskKnownState::Interrupted) {
|
||||||
Some(TaskKnownState::Connected(ref p)) if p == &peer_id => (),
|
Some(TaskKnownState::Connected(ref p)) if p == &peer_id => (),
|
||||||
@ -382,36 +315,11 @@ where
|
|||||||
only when we remove from self.nodes at the same time.")
|
only when we remove from self.nodes at the same time.")
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
user_datas
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract from the hashmap the entries matching `node`.
|
impl<TInEvent, TOutEvent> Stream for CollectionStream<TInEvent, TOutEvent> {
|
||||||
fn extract_from_attempt<TUserData>(
|
type Item = CollectionEvent<TOutEvent>;
|
||||||
outbound_attempts: &mut FnvHashMap<usize, (PeerId, TUserData)>,
|
|
||||||
node: &PeerId,
|
|
||||||
) -> Vec<TUserData> {
|
|
||||||
let to_remove: Vec<usize> = outbound_attempts
|
|
||||||
.iter()
|
|
||||||
.filter(|(_, &(ref key, _))| key == node)
|
|
||||||
.map(|(&k, _)| k)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut user_datas = Vec::with_capacity(to_remove.len());
|
|
||||||
for to_remove in to_remove {
|
|
||||||
let (_, user_data) = outbound_attempts.remove(&to_remove)
|
|
||||||
.expect("The elements in to_remove were found by iterating once over the hashmap and \
|
|
||||||
are therefore known to be valid and unique");
|
|
||||||
user_datas.push(user_data);
|
|
||||||
}
|
|
||||||
user_datas
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<TMuxer, TUserData> Stream for CollectionStream<TMuxer, TUserData>
|
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
{
|
|
||||||
type Item = CollectionEvent<TMuxer, TUserData>;
|
|
||||||
type Error = Void; // TODO: use ! once stable
|
type Error = Void; // TODO: use ! once stable
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
@ -433,67 +341,10 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match event {
|
Ok(Async::Ready(Some(CollectionEvent::NodeEvent {
|
||||||
NodeEvent::Multiaddr(address) => {
|
peer_id,
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeMultiaddr {
|
event,
|
||||||
peer_id,
|
})))
|
||||||
address,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
NodeEvent::InboundSubstream { substream } => {
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::InboundSubstream {
|
|
||||||
peer_id,
|
|
||||||
substream,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
NodeEvent::OutboundSubstream {
|
|
||||||
user_data,
|
|
||||||
substream,
|
|
||||||
} => {
|
|
||||||
let (_peer_id, actual_data) = self
|
|
||||||
.outbound_attempts
|
|
||||||
.remove(&user_data)
|
|
||||||
.expect("We insert a unique usize in outbound_attempts at the \
|
|
||||||
same time as we ask the node to open a substream with \
|
|
||||||
this usize. The API of the node is guaranteed to produce \
|
|
||||||
the value we passed when the substream is actually \
|
|
||||||
opened. The only other places where we remove from \
|
|
||||||
outbound_attempts are if the outbound failed, or if the
|
|
||||||
node's task errored or was closed. If the node's task
|
|
||||||
is closed by us, we set its state to `Interrupted` so
|
|
||||||
that event that it produces are not processed.");
|
|
||||||
debug_assert_eq!(_peer_id, peer_id);
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::OutboundSubstream {
|
|
||||||
peer_id,
|
|
||||||
user_data: actual_data,
|
|
||||||
substream,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
NodeEvent::InboundClosed => {
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::InboundClosed {
|
|
||||||
peer_id,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
NodeEvent::OutboundClosed { user_data } => {
|
|
||||||
let (_peer_id, actual_data) = self
|
|
||||||
.outbound_attempts
|
|
||||||
.remove(&user_data)
|
|
||||||
.expect("We insert a unique usize in outbound_attempts at the \
|
|
||||||
same time as we ask the node to open a substream with \
|
|
||||||
this usize. The API of the node is guaranteed to produce \
|
|
||||||
the value we passed when the substream is actually \
|
|
||||||
opened. The only other places where we remove from \
|
|
||||||
outbound_attempts are if the outbound succeeds, or if the
|
|
||||||
node's task errored or was closed. If the node's task
|
|
||||||
is closed by us, we set its state to `Interrupted` so
|
|
||||||
that event that it produces are not processed.");
|
|
||||||
debug_assert_eq!(_peer_id, peer_id);
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::OutboundClosed {
|
|
||||||
peer_id,
|
|
||||||
user_data: actual_data,
|
|
||||||
})))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => {
|
Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => {
|
||||||
{
|
{
|
||||||
@ -523,13 +374,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender));
|
let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender));
|
||||||
let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id);
|
|
||||||
if let Some(replaced_node) = replaced_node {
|
if let Some(replaced_node) = replaced_node {
|
||||||
let old = self.tasks.insert(replaced_node.0, TaskKnownState::Interrupted);
|
let old = self.tasks.insert(replaced_node.0, TaskKnownState::Interrupted);
|
||||||
debug_assert_eq!(old.map(|s| s.is_pending()), Some(false));
|
debug_assert_eq!(old.map(|s| s.is_pending()), Some(false));
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
||||||
peer_id,
|
peer_id,
|
||||||
closed_outbound_substreams: user_datas,
|
|
||||||
id: task_id,
|
id: task_id,
|
||||||
})))
|
})))
|
||||||
} else {
|
} else {
|
||||||
@ -553,9 +402,6 @@ where
|
|||||||
|
|
||||||
let val = self.nodes.remove(&peer_id);
|
let val = self.nodes.remove(&peer_id);
|
||||||
debug_assert!(val.is_some());
|
debug_assert!(val.is_some());
|
||||||
debug_assert!(
|
|
||||||
extract_from_attempt(&mut self.outbound_attempts, &peer_id).is_empty()
|
|
||||||
);
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id })))
|
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id })))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some((InToExtMessage::NodeError(err), task_id)))) => {
|
Ok(Async::Ready(Some((InToExtMessage::NodeError(err), task_id)))) => {
|
||||||
@ -572,11 +418,9 @@ where
|
|||||||
|
|
||||||
let val = self.nodes.remove(&peer_id);
|
let val = self.nodes.remove(&peer_id);
|
||||||
debug_assert!(val.is_some());
|
debug_assert!(val.is_some());
|
||||||
let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id);
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
||||||
peer_id,
|
peer_id,
|
||||||
error: err,
|
error: err,
|
||||||
closed_outbound_substreams: user_datas,
|
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some((InToExtMessage::ReachError(err), task_id)))) => {
|
Ok(Async::Ready(Some((InToExtMessage::ReachError(err), task_id)))) => {
|
||||||
@ -610,45 +454,37 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Message to transmit from the public API to a task.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
enum ExtToInMessage {
|
|
||||||
/// A new substream shall be opened.
|
|
||||||
OpenSubstream(usize),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Message to transmit from a task to the public API.
|
/// Message to transmit from a task to the public API.
|
||||||
enum InToExtMessage<TMuxer>
|
enum InToExtMessage<TInEvent, TOutEvent> {
|
||||||
where
|
|
||||||
TMuxer: StreamMuxer,
|
|
||||||
{
|
|
||||||
/// A connection to a node has succeeded.
|
/// A connection to a node has succeeded.
|
||||||
/// Closing the returned sender will end the task.
|
/// Closing the returned sender will end the task.
|
||||||
NodeReached(PeerId, mpsc::UnboundedSender<ExtToInMessage>),
|
NodeReached(PeerId, mpsc::UnboundedSender<TInEvent>),
|
||||||
NodeClosed,
|
NodeClosed,
|
||||||
NodeError(IoError),
|
NodeError(IoError),
|
||||||
ReachError(IoError),
|
ReachError(IoError),
|
||||||
/// An event from the node.
|
/// An event from the node.
|
||||||
NodeEvent(NodeEvent<TMuxer, usize>),
|
NodeEvent(TOutEvent),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Implementation of `Future` that handles a single node, and all the communications between
|
/// Implementation of `Future` that handles a single node, and all the communications between
|
||||||
/// the various components of the `CollectionStream`.
|
/// the various components of the `CollectionStream`.
|
||||||
struct NodeTask<TFut, TMuxer, TAddrFut>
|
struct NodeTask<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
{
|
{
|
||||||
/// Sender to transmit events to the outside.
|
/// Sender to transmit events to the outside.
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TMuxer>, ReachAttemptId)>,
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TInEvent, TOutEvent>, ReachAttemptId)>,
|
||||||
/// Inner state of the `NodeTask`.
|
/// Inner state of the `NodeTask`.
|
||||||
inner: NodeTaskInner<TFut, TMuxer, TAddrFut>,
|
inner: NodeTaskInner<TFut, TMuxer, TAddrFut, THandler, TInEvent>,
|
||||||
/// Identifier of the attempt.
|
/// Identifier of the attempt.
|
||||||
id: ReachAttemptId,
|
id: ReachAttemptId,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum NodeTaskInner<TFut, TMuxer, TAddrFut>
|
enum NodeTaskInner<TFut, TMuxer, TAddrFut, THandler, TInEvent>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
{
|
{
|
||||||
/// Future to resolve to connect to the node.
|
/// Future to resolve to connect to the node.
|
||||||
Future {
|
Future {
|
||||||
@ -656,23 +492,26 @@ where
|
|||||||
future: TFut,
|
future: TFut,
|
||||||
/// Allows interrupting the attempt.
|
/// Allows interrupting the attempt.
|
||||||
interrupt: oneshot::Receiver<()>,
|
interrupt: oneshot::Receiver<()>,
|
||||||
|
/// The handler that will be used to build the `HandledNode`.
|
||||||
|
handler: Option<THandler>,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Fully functional node.
|
/// Fully functional node.
|
||||||
Node {
|
Node {
|
||||||
/// The object that is actually processing things.
|
/// The object that is actually processing things.
|
||||||
/// This is an `Option` because we need to be able to extract it.
|
node: HandledNode<TMuxer, TAddrFut, THandler>,
|
||||||
node: NodeStream<TMuxer, TAddrFut, usize>,
|
/// Receiving end for events sent from the main `CollectionStream`. `None` if closed.
|
||||||
/// Receiving end for events sent from the main `CollectionStream`.
|
in_events_rx: Option<mpsc::UnboundedReceiver<TInEvent>>,
|
||||||
in_events_rx: mpsc::UnboundedReceiver<ExtToInMessage>,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TFut, TMuxer, TAddrFut> Future for NodeTask<TFut, TMuxer, TAddrFut>
|
impl<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent> Future for
|
||||||
|
NodeTask<TFut, TMuxer, TAddrFut, THandler, TInEvent, TOutEvent>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError>,
|
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError>,
|
||||||
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Item = ();
|
||||||
type Error = ();
|
type Error = ();
|
||||||
@ -685,6 +524,7 @@ where
|
|||||||
let new_state = if let NodeTaskInner::Future {
|
let new_state = if let NodeTaskInner::Future {
|
||||||
ref mut future,
|
ref mut future,
|
||||||
ref mut interrupt,
|
ref mut interrupt,
|
||||||
|
ref mut handler,
|
||||||
} = self.inner
|
} = self.inner
|
||||||
{
|
{
|
||||||
match interrupt.poll() {
|
match interrupt.poll() {
|
||||||
@ -698,9 +538,12 @@ where
|
|||||||
let event = InToExtMessage::NodeReached(peer_id, sender);
|
let event = InToExtMessage::NodeReached(peer_id, sender);
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||||
|
|
||||||
|
let handler = handler.take()
|
||||||
|
.expect("The handler is only extracted right before we switch state");
|
||||||
|
|
||||||
Some(NodeTaskInner::Node {
|
Some(NodeTaskInner::Node {
|
||||||
node: NodeStream::new(muxer, addr_fut),
|
node: HandledNode::new(muxer, addr_fut, handler),
|
||||||
in_events_rx: rx,
|
in_events_rx: Some(rx),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
@ -728,25 +571,21 @@ where
|
|||||||
} = self.inner
|
} = self.inner
|
||||||
{
|
{
|
||||||
// Start by handling commands received from the outside of the task.
|
// Start by handling commands received from the outside of the task.
|
||||||
loop {
|
if let Some(mut local_rx) = in_events_rx.take() {
|
||||||
match in_events_rx.poll() {
|
*in_events_rx = loop {
|
||||||
Ok(Async::Ready(Some(ExtToInMessage::OpenSubstream(user_data)))) => match node
|
match local_rx.poll() {
|
||||||
.open_substream(user_data)
|
Ok(Async::Ready(Some(event))) => {
|
||||||
{
|
node.inject_event(event);
|
||||||
Ok(()) => (),
|
},
|
||||||
Err(user_data) => {
|
Ok(Async::Ready(None)) => {
|
||||||
let event =
|
// Node closed by the external API ; start shutdown process.
|
||||||
InToExtMessage::NodeEvent(NodeEvent::OutboundClosed { user_data });
|
node.shutdown();
|
||||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
break None;
|
||||||
}
|
}
|
||||||
},
|
Ok(Async::NotReady) => break Some(local_rx),
|
||||||
Ok(Async::Ready(None)) => {
|
Err(()) => unreachable!("An unbounded receiver never errors"),
|
||||||
// Node closed by the external API ; end the task
|
|
||||||
return Ok(Async::Ready(()));
|
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => break,
|
};
|
||||||
Err(()) => unreachable!("An unbounded receiver never errors"),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the node.
|
// Process the node.
|
||||||
|
261
core/src/nodes/handled_node.rs
Normal file
261
core/src/nodes/handled_node.rs
Normal file
@ -0,0 +1,261 @@
|
|||||||
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use muxing::StreamMuxer;
|
||||||
|
use nodes::node::{NodeEvent, NodeStream, Substream};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use Multiaddr;
|
||||||
|
|
||||||
|
/// Handler for the substreams of a node.
|
||||||
|
///
|
||||||
|
/// > Note: When implementing the various methods, don't forget that you have to register the
|
||||||
|
/// > task that was the latest to poll and notify it.
|
||||||
|
pub trait NodeHandler<TSubstream> {
|
||||||
|
/// Custom event that can be received from the outside.
|
||||||
|
type InEvent;
|
||||||
|
/// Custom event that can be produced by the handler and that will be returned by the swarm.
|
||||||
|
type OutEvent;
|
||||||
|
/// Information about a substream. Can be sent to the handler through a `NodeHandlerEndpoint`,
|
||||||
|
/// and will be passed back in `inject_substream` or `inject_outbound_closed`.
|
||||||
|
type OutboundOpenInfo;
|
||||||
|
|
||||||
|
/// Sends a new substream to the handler.
|
||||||
|
///
|
||||||
|
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
|
||||||
|
fn inject_substream(&mut self, substream: TSubstream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>);
|
||||||
|
|
||||||
|
/// Indicates the handler that the inbound part of the muxer has been closed, and that
|
||||||
|
/// therefore no more inbound substream will be produced.
|
||||||
|
fn inject_inbound_closed(&mut self);
|
||||||
|
|
||||||
|
/// Indicates the handler that an outbound substream failed to open because the outbound
|
||||||
|
/// part of the muxer has been closed.
|
||||||
|
fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo);
|
||||||
|
|
||||||
|
/// Indicates the handler that the multiaddr future has resolved.
|
||||||
|
fn inject_multiaddr(&mut self, multiaddr: Result<Multiaddr, IoError>);
|
||||||
|
|
||||||
|
/// Injects an event coming from the outside in the handler.
|
||||||
|
fn inject_event(&mut self, event: Self::InEvent);
|
||||||
|
|
||||||
|
/// Indicates the node that it should shut down. After that, it is expected that `poll()`
|
||||||
|
/// returns `Ready(None)` as soon as possible.
|
||||||
|
///
|
||||||
|
/// This method allows an implementation to perform a graceful shutdown of the substreams, and
|
||||||
|
/// send back various events.
|
||||||
|
fn shutdown(&mut self);
|
||||||
|
|
||||||
|
/// Should behave like `Stream::poll()`. Should close if no more event can be produced and the
|
||||||
|
/// node should be closed.
|
||||||
|
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>>, IoError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Endpoint for a received substream.
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
|
pub enum NodeHandlerEndpoint<TOutboundOpenInfo> {
|
||||||
|
Dialer(TOutboundOpenInfo),
|
||||||
|
Listener,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Event produces by a handler.
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
|
pub enum NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
|
||||||
|
/// Require a new outbound substream to be opened with the remote.
|
||||||
|
OutboundSubstreamRequest(TOutboundOpenInfo),
|
||||||
|
|
||||||
|
/// Other event.
|
||||||
|
Custom(TCustom),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Event produces by a handler.
|
||||||
|
impl<TOutboundOpenInfo, TCustom> NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
|
||||||
|
/// If this is `OutboundSubstreamRequest`, maps the content to something else.
|
||||||
|
#[inline]
|
||||||
|
pub fn map_outbound_open_info<F, I>(self, map: F) -> NodeHandlerEvent<I, TCustom>
|
||||||
|
where F: FnOnce(TOutboundOpenInfo) -> I
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
NodeHandlerEvent::OutboundSubstreamRequest(val) => {
|
||||||
|
NodeHandlerEvent::OutboundSubstreamRequest(map(val))
|
||||||
|
},
|
||||||
|
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// If this is `Custom`, maps the content to something else.
|
||||||
|
#[inline]
|
||||||
|
pub fn map_custom<F, I>(self, map: F) -> NodeHandlerEvent<TOutboundOpenInfo, I>
|
||||||
|
where F: FnOnce(TCustom) -> I
|
||||||
|
{
|
||||||
|
match self {
|
||||||
|
NodeHandlerEvent::OutboundSubstreamRequest(val) => {
|
||||||
|
NodeHandlerEvent::OutboundSubstreamRequest(val)
|
||||||
|
},
|
||||||
|
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A node combined with an implementation of `NodeHandler`.
|
||||||
|
// TODO: impl Debug
|
||||||
|
pub struct HandledNode<TMuxer, TAddrFut, THandler>
|
||||||
|
where
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
|
{
|
||||||
|
/// Node that handles the muxing. Can be `None` if the handled node is shutting down.
|
||||||
|
node: Option<NodeStream<TMuxer, TAddrFut, THandler::OutboundOpenInfo>>,
|
||||||
|
/// Handler that processes substreams.
|
||||||
|
handler: THandler,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TMuxer, TAddrFut, THandler> HandledNode<TMuxer, TAddrFut, THandler>
|
||||||
|
where
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
|
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||||
|
{
|
||||||
|
/// Builds a new `HandledNode`.
|
||||||
|
#[inline]
|
||||||
|
pub fn new(muxer: TMuxer, multiaddr_future: TAddrFut, handler: THandler) -> Self {
|
||||||
|
HandledNode {
|
||||||
|
node: Some(NodeStream::new(muxer, multiaddr_future)),
|
||||||
|
handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Injects an event to the handler.
|
||||||
|
#[inline]
|
||||||
|
pub fn inject_event(&mut self, event: THandler::InEvent) {
|
||||||
|
self.handler.inject_event(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the inbound channel of the muxer is closed.
|
||||||
|
///
|
||||||
|
/// If `true` is returned, then no more inbound substream will be received.
|
||||||
|
#[inline]
|
||||||
|
pub fn is_inbound_closed(&self) -> bool {
|
||||||
|
self.node.as_ref().map(|n| n.is_inbound_closed()).unwrap_or(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the outbound channel of the muxer is closed.
|
||||||
|
///
|
||||||
|
/// If `true` is returned, then no more outbound substream will be opened.
|
||||||
|
#[inline]
|
||||||
|
pub fn is_outbound_closed(&self) -> bool {
|
||||||
|
self.node.as_ref().map(|n| n.is_outbound_closed()).unwrap_or(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the handled node is in the process of shutting down.
|
||||||
|
#[inline]
|
||||||
|
pub fn is_shutting_down(&self) -> bool {
|
||||||
|
self.node.is_none()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Indicates the handled node that it should shut down. After calling this method, the
|
||||||
|
/// `Stream` will end in the not-so-distant future.
|
||||||
|
///
|
||||||
|
/// After this method returns, `is_shutting_down()` should return true.
|
||||||
|
pub fn shutdown(&mut self) {
|
||||||
|
if let Some(node) = self.node.take() {
|
||||||
|
for user_data in node.close() {
|
||||||
|
self.handler.inject_outbound_closed(user_data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.handler.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TMuxer, TAddrFut, THandler> Stream for HandledNode<TMuxer, TAddrFut, THandler>
|
||||||
|
where
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>>,
|
||||||
|
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||||
|
{
|
||||||
|
type Item = THandler::OutEvent;
|
||||||
|
type Error = IoError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
|
// We extract the value from `self.node` and put it back in place if `NotReady`.
|
||||||
|
if let Some(mut node) = self.node.take() {
|
||||||
|
loop {
|
||||||
|
match node.poll() {
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
self.node = Some(node);
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(NodeEvent::InboundSubstream { substream }))) => {
|
||||||
|
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream }))) => {
|
||||||
|
let endpoint = NodeHandlerEndpoint::Dialer(user_data);
|
||||||
|
self.handler.inject_substream(substream, endpoint);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
// Breaking from the loop without putting back the node.
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(NodeEvent::Multiaddr(result)))) => {
|
||||||
|
self.handler.inject_multiaddr(result);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(NodeEvent::OutboundClosed { user_data }))) => {
|
||||||
|
self.handler.inject_outbound_closed(user_data);
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(NodeEvent::InboundClosed))) => {
|
||||||
|
self.handler.inject_inbound_closed();
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
// Breaking from the loop without putting back the node.
|
||||||
|
return Err(err);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.handler.poll() {
|
||||||
|
Ok(Async::NotReady) => break,
|
||||||
|
Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(user_data)))) => {
|
||||||
|
if let Some(node) = self.node.as_mut() {
|
||||||
|
match node.open_substream(user_data) {
|
||||||
|
Ok(()) => (),
|
||||||
|
Err(user_data) => self.handler.inject_outbound_closed(user_data),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.handler.inject_outbound_closed(user_data);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))) => {
|
||||||
|
return Ok(Async::Ready(Some(event)));
|
||||||
|
},
|
||||||
|
Ok(Async::Ready(None)) => {
|
||||||
|
return Ok(Async::Ready(None));
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
return Err(err);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
@ -19,6 +19,7 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
pub mod collection;
|
pub mod collection;
|
||||||
|
pub mod handled_node;
|
||||||
pub mod listeners;
|
pub mod listeners;
|
||||||
pub mod node;
|
pub mod node;
|
||||||
pub mod swarm;
|
pub mod swarm;
|
||||||
|
@ -20,10 +20,11 @@
|
|||||||
|
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use futures::{prelude::*, future};
|
use futures::{prelude::*, future};
|
||||||
use muxing;
|
use muxing::StreamMuxer;
|
||||||
use nodes::collection::{
|
use nodes::collection::{
|
||||||
CollectionEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId,
|
CollectionEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId,
|
||||||
};
|
};
|
||||||
|
use nodes::handled_node::NodeHandler;
|
||||||
use nodes::listeners::{ListenersEvent, ListenersStream};
|
use nodes::listeners::{ListenersEvent, ListenersStream};
|
||||||
use nodes::node::Substream;
|
use nodes::node::Substream;
|
||||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||||
@ -32,16 +33,15 @@ use void::Void;
|
|||||||
use {Endpoint, Multiaddr, PeerId, Transport};
|
use {Endpoint, Multiaddr, PeerId, Transport};
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles the nodes.
|
/// Implementation of `Stream` that handles the nodes.
|
||||||
pub struct Swarm<TTrans, TMuxer, TUserData>
|
pub struct Swarm<TTrans, TInEvent, TOutEvent, THandlerBuild>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TMuxer: muxing::StreamMuxer,
|
|
||||||
{
|
{
|
||||||
/// Listeners for incoming connections.
|
/// Listeners for incoming connections.
|
||||||
listeners: ListenersStream<TTrans>,
|
listeners: ListenersStream<TTrans>,
|
||||||
|
|
||||||
/// The nodes currently active.
|
/// The nodes currently active.
|
||||||
active_nodes: CollectionStream<TMuxer, TUserData>,
|
active_nodes: CollectionStream<TInEvent, TOutEvent>,
|
||||||
|
|
||||||
/// Attempts to reach a peer.
|
/// Attempts to reach a peer.
|
||||||
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
|
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
|
||||||
@ -52,6 +52,9 @@ where
|
|||||||
|
|
||||||
/// For each peer ID we're connected to, contains the multiaddress we're connected to.
|
/// For each peer ID we're connected to, contains the multiaddress we're connected to.
|
||||||
connected_multiaddresses: FnvHashMap<PeerId, Multiaddr>,
|
connected_multiaddresses: FnvHashMap<PeerId, Multiaddr>,
|
||||||
|
|
||||||
|
/// Object that builds new handlers.
|
||||||
|
handler_build: THandlerBuild,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt to reach a peer.
|
/// Attempt to reach a peer.
|
||||||
@ -66,10 +69,9 @@ struct OutReachAttempt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `Swarm`.
|
/// Event that can happen on the `Swarm`.
|
||||||
pub enum SwarmEvent<TTrans, TMuxer, TUserData>
|
pub enum SwarmEvent<TTrans, TOutEvent>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TMuxer: muxing::StreamMuxer,
|
|
||||||
{
|
{
|
||||||
/// One of the listeners gracefully closed.
|
/// One of the listeners gracefully closed.
|
||||||
ListenerClosed {
|
ListenerClosed {
|
||||||
@ -108,8 +110,6 @@ where
|
|||||||
Replaced {
|
Replaced {
|
||||||
/// Id of the peer.
|
/// Id of the peer.
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
/// Outbound substream attempts that have been closed in the process.
|
|
||||||
closed_outbound_substreams: Vec<TUserData>,
|
|
||||||
/// Multiaddr we were connected to, or `None` if it was unknown.
|
/// Multiaddr we were connected to, or `None` if it was unknown.
|
||||||
closed_multiaddr: Option<Multiaddr>,
|
closed_multiaddr: Option<Multiaddr>,
|
||||||
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
||||||
@ -136,8 +136,6 @@ where
|
|||||||
address: Option<Multiaddr>,
|
address: Option<Multiaddr>,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: IoError,
|
error: IoError,
|
||||||
/// Pending outbound substreams that were cancelled.
|
|
||||||
closed_outbound_substreams: Vec<TUserData>,
|
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Failed to reach a peer that we were trying to dial.
|
/// Failed to reach a peer that we were trying to dial.
|
||||||
@ -183,46 +181,12 @@ where
|
|||||||
remain_addrs_attempt: usize,
|
remain_addrs_attempt: usize,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// A new inbound substream arrived.
|
/// A node produced a custom event.
|
||||||
InboundSubstream {
|
NodeEvent {
|
||||||
/// Id of the peer we received a substream from.
|
/// Id of the node that produced the event.
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
/// The newly-opened substream.
|
/// Event that was produced by the node.
|
||||||
substream: Substream<TMuxer>,
|
event: TOutEvent,
|
||||||
},
|
|
||||||
|
|
||||||
/// An outbound substream has successfully been opened.
|
|
||||||
OutboundSubstream {
|
|
||||||
/// Id of the peer we received a substream from.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// User data that has been passed to the `open_substream` method.
|
|
||||||
user_data: TUserData,
|
|
||||||
/// The newly-opened substream.
|
|
||||||
substream: Substream<TMuxer>,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// The inbound side of a muxer has been gracefully closed. No more inbound substreams will
|
|
||||||
/// be produced.
|
|
||||||
InboundClosed {
|
|
||||||
/// Id of the peer.
|
|
||||||
peer_id: PeerId,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// An outbound substream couldn't be opened because the muxer is no longer capable of opening
|
|
||||||
/// more substreams.
|
|
||||||
OutboundClosed {
|
|
||||||
/// Id of the peer we were trying to open a substream with.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// User data that has been passed to the `open_substream` method.
|
|
||||||
user_data: TUserData,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// The multiaddress of the node has been resolved.
|
|
||||||
NodeMultiaddr {
|
|
||||||
/// Identifier of the node.
|
|
||||||
peer_id: PeerId,
|
|
||||||
/// Address that has been resolved.
|
|
||||||
address: Result<Multiaddr, IoError>,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -271,14 +235,38 @@ impl ConnectedPoint {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTrans, TMuxer, TUserData> Swarm<TTrans, TMuxer, TUserData>
|
/// Trait for structures that can create new factories.
|
||||||
|
pub trait HandlerFactory {
|
||||||
|
/// The generated handler.
|
||||||
|
type Handler;
|
||||||
|
|
||||||
|
/// Creates a new handler.
|
||||||
|
fn new_handler(&self) -> Self::Handler;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, THandler> HandlerFactory for T where T: Fn() -> THandler {
|
||||||
|
type Handler = THandler;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn new_handler(&self) -> THandler {
|
||||||
|
(*self)()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerBuild>
|
||||||
|
Swarm<TTrans, TInEvent, TOutEvent, THandlerBuild>
|
||||||
where
|
where
|
||||||
TTrans: Transport + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TMuxer: muxing::StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
|
THandlerBuild: HandlerFactory<Handler = THandler>,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
|
||||||
|
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
{
|
{
|
||||||
/// Creates a new node events stream.
|
/// Creates a new node events stream.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new(transport: TTrans) -> Self {
|
pub fn new(transport: TTrans) -> Swarm<TTrans, TInEvent, TOutEvent, fn() -> THandler>
|
||||||
|
where THandler: Default,
|
||||||
|
{
|
||||||
// TODO: with_capacity?
|
// TODO: with_capacity?
|
||||||
Swarm {
|
Swarm {
|
||||||
listeners: ListenersStream::new(transport),
|
listeners: ListenersStream::new(transport),
|
||||||
@ -286,6 +274,21 @@ where
|
|||||||
out_reach_attempts: Default::default(),
|
out_reach_attempts: Default::default(),
|
||||||
other_reach_attempts: Vec::new(),
|
other_reach_attempts: Vec::new(),
|
||||||
connected_multiaddresses: Default::default(),
|
connected_multiaddresses: Default::default(),
|
||||||
|
handler_build: Default::default,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as `new`, but lets you specify a way to build a node handler.
|
||||||
|
#[inline]
|
||||||
|
pub fn with_handler_builder(transport: TTrans, handler_build: THandlerBuild) -> Self {
|
||||||
|
// TODO: with_capacity?
|
||||||
|
Swarm {
|
||||||
|
listeners: ListenersStream::new(transport),
|
||||||
|
active_nodes: CollectionStream::new(),
|
||||||
|
out_reach_attempts: Default::default(),
|
||||||
|
other_reach_attempts: Vec::new(),
|
||||||
|
connected_multiaddresses: Default::default(),
|
||||||
|
handler_build,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -318,7 +321,10 @@ where
|
|||||||
pub fn nat_traversal<'a>(
|
pub fn nat_traversal<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
observed_addr: &'a Multiaddr,
|
observed_addr: &'a Multiaddr,
|
||||||
) -> impl Iterator<Item = Multiaddr> + 'a {
|
) -> impl Iterator<Item = Multiaddr> + 'a
|
||||||
|
where TMuxer: 'a,
|
||||||
|
THandler: 'a,
|
||||||
|
{
|
||||||
self.listeners()
|
self.listeners()
|
||||||
.flat_map(move |server| self.transport().nat_traversal(server, observed_addr))
|
.flat_map(move |server| self.transport().nat_traversal(server, observed_addr))
|
||||||
}
|
}
|
||||||
@ -329,17 +335,18 @@ where
|
|||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
let future = match self.transport().clone().dial(addr.clone()) {
|
let future = match self.transport().clone().dial(addr.clone()) {
|
||||||
Ok(fut) => fut,
|
Ok(fut) => fut,
|
||||||
Err((_, addr)) => return Err(addr),
|
Err((_, addr)) => return Err(addr),
|
||||||
};
|
};
|
||||||
|
|
||||||
let reach_id = self.active_nodes.add_reach_attempt(future);
|
let reach_id = self.active_nodes.add_reach_attempt(future, self.handler_build.new_handler());
|
||||||
self.other_reach_attempts
|
self.other_reach_attempts
|
||||||
.push((reach_id, ConnectedPoint::Dialer { address: addr }));
|
.push((reach_id, ConnectedPoint::Dialer { address: addr }));
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -360,12 +367,17 @@ where
|
|||||||
.count()
|
.count()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sends an event to all nodes.
|
||||||
|
#[inline]
|
||||||
|
pub fn broadcast_event(&mut self, event: &TInEvent)
|
||||||
|
where TInEvent: Clone,
|
||||||
|
{
|
||||||
|
self.active_nodes.broadcast_event(event)
|
||||||
|
}
|
||||||
|
|
||||||
/// Grants access to a struct that represents a peer.
|
/// Grants access to a struct that represents a peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer(&mut self, peer_id: PeerId) -> Peer<TTrans, TMuxer, TUserData>
|
pub fn peer(&mut self, peer_id: PeerId) -> Peer<TTrans, TInEvent, TOutEvent, THandlerBuild> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
{
|
|
||||||
// TODO: we do `peer_mut(...).is_some()` followed with `peer_mut(...).unwrap()`, otherwise
|
// TODO: we do `peer_mut(...).is_some()` followed with `peer_mut(...).unwrap()`, otherwise
|
||||||
// the borrow checker yells at us.
|
// the borrow checker yells at us.
|
||||||
|
|
||||||
@ -409,16 +421,17 @@ where
|
|||||||
&mut self,
|
&mut self,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
reach_id: ReachAttemptId,
|
reach_id: ReachAttemptId,
|
||||||
closed_outbound_substreams: Option<Vec<TUserData>>,
|
replaced: bool,
|
||||||
) -> SwarmEvent<TTrans, TMuxer, TUserData>
|
) -> SwarmEvent<TTrans, TOutEvent>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
||||||
// it also makes the logic easier.
|
// it also makes the logic easier.
|
||||||
@ -442,12 +455,11 @@ where
|
|||||||
out_reach_attempts should always be in sync with the actual attempts");
|
out_reach_attempts should always be in sync with the actual attempts");
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(closed_outbound_substreams) = closed_outbound_substreams {
|
if replaced {
|
||||||
return SwarmEvent::Replaced {
|
return SwarmEvent::Replaced {
|
||||||
peer_id,
|
peer_id,
|
||||||
endpoint,
|
endpoint,
|
||||||
closed_multiaddr,
|
closed_multiaddr,
|
||||||
closed_outbound_substreams,
|
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
return SwarmEvent::Connected { peer_id, endpoint };
|
return SwarmEvent::Connected { peer_id, endpoint };
|
||||||
@ -474,12 +486,11 @@ where
|
|||||||
address: attempt.cur_attempted,
|
address: attempt.cur_attempted,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(closed_outbound_substreams) = closed_outbound_substreams {
|
if replaced {
|
||||||
return SwarmEvent::Replaced {
|
return SwarmEvent::Replaced {
|
||||||
peer_id,
|
peer_id,
|
||||||
endpoint,
|
endpoint,
|
||||||
closed_multiaddr,
|
closed_multiaddr,
|
||||||
closed_outbound_substreams,
|
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
return SwarmEvent::Connected { peer_id, endpoint };
|
return SwarmEvent::Connected { peer_id, endpoint };
|
||||||
@ -500,23 +511,22 @@ where
|
|||||||
let num_remain = attempt.next_attempts.len();
|
let num_remain = attempt.next_attempts.len();
|
||||||
let failed_addr = attempt.cur_attempted.clone();
|
let failed_addr = attempt.cur_attempted.clone();
|
||||||
|
|
||||||
let opened_attempts = self.active_nodes.peer_mut(&peer_id)
|
self.active_nodes.peer_mut(&peer_id)
|
||||||
.expect("When we receive a NodeReached or NodeReplaced event from active_nodes, \
|
.expect("When we receive a NodeReached or NodeReplaced event from active_nodes, \
|
||||||
it is guaranteed that the PeerId is valid and therefore that \
|
it is guaranteed that the PeerId is valid and therefore that \
|
||||||
active_nodes.peer_mut succeeds with this ID. handle_node_reached is \
|
active_nodes.peer_mut succeeds with this ID. handle_node_reached is \
|
||||||
called only to handle these events.")
|
called only to handle these events.")
|
||||||
.close();
|
.close();
|
||||||
debug_assert!(opened_attempts.is_empty());
|
|
||||||
|
|
||||||
if !attempt.next_attempts.is_empty() {
|
if !attempt.next_attempts.is_empty() {
|
||||||
let mut attempt = attempt;
|
let mut attempt = attempt;
|
||||||
attempt.cur_attempted = attempt.next_attempts.remove(0);
|
attempt.cur_attempted = attempt.next_attempts.remove(0);
|
||||||
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
|
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
|
||||||
Ok(fut) => self.active_nodes.add_reach_attempt(fut),
|
Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()),
|
||||||
Err((_, addr)) => {
|
Err((_, addr)) => {
|
||||||
let msg = format!("unsupported multiaddr {}", addr);
|
let msg = format!("unsupported multiaddr {}", addr);
|
||||||
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
|
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
|
||||||
self.active_nodes.add_reach_attempt::<_, future::FutureResult<Multiaddr, IoError>>(fut)
|
self.active_nodes.add_reach_attempt::<_, _, future::FutureResult<Multiaddr, IoError>, _>(fut, self.handler_build.new_handler())
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -550,15 +560,16 @@ where
|
|||||||
&mut self,
|
&mut self,
|
||||||
reach_id: ReachAttemptId,
|
reach_id: ReachAttemptId,
|
||||||
error: IoError,
|
error: IoError,
|
||||||
) -> Option<SwarmEvent<TTrans, TMuxer, TUserData>>
|
) -> Option<SwarmEvent<TTrans, TOutEvent>>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
// Search for the attempt in `out_reach_attempts`.
|
// Search for the attempt in `out_reach_attempts`.
|
||||||
// TODO: could be more optimal than iterating over everything
|
// TODO: could be more optimal than iterating over everything
|
||||||
@ -578,11 +589,11 @@ where
|
|||||||
let mut attempt = attempt;
|
let mut attempt = attempt;
|
||||||
attempt.cur_attempted = attempt.next_attempts.remove(0);
|
attempt.cur_attempted = attempt.next_attempts.remove(0);
|
||||||
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
|
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
|
||||||
Ok(fut) => self.active_nodes.add_reach_attempt(fut),
|
Ok(fut) => self.active_nodes.add_reach_attempt(fut, self.handler_build.new_handler()),
|
||||||
Err((_, addr)) => {
|
Err((_, addr)) => {
|
||||||
let msg = format!("unsupported multiaddr {}", addr);
|
let msg = format!("unsupported multiaddr {}", addr);
|
||||||
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
|
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
|
||||||
self.active_nodes.add_reach_attempt::<_, future::FutureResult<Multiaddr, IoError>>(fut)
|
self.active_nodes.add_reach_attempt::<_, _, future::FutureResult<Multiaddr, IoError>, _>(fut, self.handler_build.new_handler())
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -628,35 +639,36 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// State of a peer in the system.
|
/// State of a peer in the system.
|
||||||
pub enum Peer<'a, TTrans, TMuxer, TUserData>
|
pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a>
|
||||||
where
|
where
|
||||||
TTrans: Transport + 'a,
|
TTrans: Transport,
|
||||||
TMuxer: muxing::StreamMuxer + 'a,
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
{
|
{
|
||||||
/// We are connected to this peer.
|
/// We are connected to this peer.
|
||||||
Connected(PeerConnected<'a, TUserData>),
|
Connected(PeerConnected<'a, TInEvent>),
|
||||||
|
|
||||||
/// We are currently attempting to connect to this peer.
|
/// We are currently attempting to connect to this peer.
|
||||||
PendingConnect(PeerPendingConnect<'a, TMuxer, TUserData>),
|
PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent>),
|
||||||
|
|
||||||
/// We are not connected to this peer at all.
|
/// We are not connected to this peer at all.
|
||||||
///
|
///
|
||||||
/// > **Note**: It is however possible that a pending incoming connection is being negotiated
|
/// > **Note**: It is however possible that a pending incoming connection is being negotiated
|
||||||
/// > and will connect to this peer, but we don't know it yet.
|
/// > and will connect to this peer, but we don't know it yet.
|
||||||
NotConnected(PeerNotConnected<'a, TTrans, TMuxer, TUserData>),
|
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandlerBuild>),
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
|
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
|
||||||
impl<'a, TTrans, TMuxer, TUserData> Peer<'a, TTrans, TMuxer, TUserData>
|
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerBuild>
|
||||||
|
Peer<'a, TTrans, TInEvent, TOutEvent, THandlerBuild>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
||||||
TMuxer: muxing::StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TUserData: Send + 'static,
|
THandlerBuild: HandlerFactory<Handler = THandler>,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
|
||||||
|
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
{
|
{
|
||||||
/// If we are connected, returns the `PeerConnected`.
|
/// If we are connected, returns the `PeerConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_connected(self) -> Option<PeerConnected<'a, TUserData>> {
|
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::Connected(peer) => Some(peer),
|
Peer::Connected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -665,7 +677,7 @@ where
|
|||||||
|
|
||||||
/// If a connection is pending, returns the `PeerPendingConnect`.
|
/// If a connection is pending, returns the `PeerPendingConnect`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TMuxer, TUserData>> {
|
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TInEvent, TOutEvent>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::PendingConnect(peer) => Some(peer),
|
Peer::PendingConnect(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -674,7 +686,7 @@ where
|
|||||||
|
|
||||||
/// If we are not connected, returns the `PeerNotConnected`.
|
/// If we are not connected, returns the `PeerNotConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TMuxer, TUserData>> {
|
pub fn as_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandlerBuild>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::NotConnected(peer) => Some(peer),
|
Peer::NotConnected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -686,14 +698,16 @@ where
|
|||||||
pub fn or_connect(
|
pub fn or_connect(
|
||||||
self,
|
self,
|
||||||
addr: Multiaddr,
|
addr: Multiaddr,
|
||||||
) -> Result<PeerPotentialConnect<'a, TMuxer, TUserData>, Self>
|
) -> Result<PeerPotentialConnect<'a, TInEvent, TOutEvent>, Self>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
self.or_connect_with(move |_| addr)
|
self.or_connect_with(move |_| addr)
|
||||||
}
|
}
|
||||||
@ -704,15 +718,17 @@ where
|
|||||||
pub fn or_connect_with<TFn>(
|
pub fn or_connect_with<TFn>(
|
||||||
self,
|
self,
|
||||||
addr: TFn,
|
addr: TFn,
|
||||||
) -> Result<PeerPotentialConnect<'a, TMuxer, TUserData>, Self>
|
) -> Result<PeerPotentialConnect<'a, TInEvent, TOutEvent>, Self>
|
||||||
where
|
where
|
||||||
TFn: FnOnce(&PeerId) -> Multiaddr,
|
TFn: FnOnce(&PeerId) -> Multiaddr,
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
match self {
|
match self {
|
||||||
Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)),
|
Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)),
|
||||||
@ -729,42 +745,31 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Peer we are potentially going to connect to.
|
/// Peer we are potentially going to connect to.
|
||||||
pub enum PeerPotentialConnect<'a, TMuxer, TUserData>
|
pub enum PeerPotentialConnect<'a, TInEvent: 'a, TOutEvent: 'a> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
TMuxer: muxing::StreamMuxer + 'a,
|
|
||||||
{
|
|
||||||
/// We are connected to this peer.
|
/// We are connected to this peer.
|
||||||
Connected(PeerConnected<'a, TUserData>),
|
Connected(PeerConnected<'a, TInEvent>),
|
||||||
|
|
||||||
/// We are currently attempting to connect to this peer.
|
/// We are currently attempting to connect to this peer.
|
||||||
PendingConnect(PeerPendingConnect<'a, TMuxer, TUserData>),
|
PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TMuxer, TUserData> PeerPotentialConnect<'a, TMuxer, TUserData>
|
impl<'a, TInEvent, TOutEvent> PeerPotentialConnect<'a, TInEvent, TOutEvent> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
TMuxer: muxing::StreamMuxer,
|
|
||||||
{
|
|
||||||
/// Closes the connection or the connection attempt.
|
/// Closes the connection or the connection attempt.
|
||||||
///
|
///
|
||||||
/// If the connection was active, returns the list of outbound substream openings that were
|
/// If the connection was active, returns the list of outbound substream openings that were
|
||||||
/// closed in the process.
|
/// closed in the process.
|
||||||
// TODO: consider returning a `PeerNotConnected`
|
// TODO: consider returning a `PeerNotConnected`
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn close(self) -> Vec<TUserData> {
|
pub fn close(self) {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::Connected(peer) => peer.close(),
|
PeerPotentialConnect::Connected(peer) => peer.close(),
|
||||||
PeerPotentialConnect::PendingConnect(peer) => {
|
PeerPotentialConnect::PendingConnect(peer) => peer.interrupt(),
|
||||||
peer.interrupt();
|
|
||||||
Vec::new()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If we are connected, returns the `PeerConnected`.
|
/// If we are connected, returns the `PeerConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_connected(self) -> Option<PeerConnected<'a, TUserData>> {
|
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent>> {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::Connected(peer) => Some(peer),
|
PeerPotentialConnect::Connected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -773,7 +778,7 @@ where
|
|||||||
|
|
||||||
/// If a connection is pending, returns the `PeerPendingConnect`.
|
/// If a connection is pending, returns the `PeerPendingConnect`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TMuxer, TUserData>> {
|
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TInEvent, TOutEvent>> {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
|
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -782,27 +787,20 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we are connected to.
|
/// Access to a peer we are connected to.
|
||||||
pub struct PeerConnected<'a, TUserData>
|
pub struct PeerConnected<'a, TInEvent: 'a> {
|
||||||
where
|
peer: CollecPeerMut<'a, TInEvent>,
|
||||||
TUserData: Send + 'static,
|
|
||||||
{
|
|
||||||
peer: CollecPeerMut<'a, TUserData>,
|
|
||||||
/// Reference to the `connected_multiaddresses` field of the parent.
|
/// Reference to the `connected_multiaddresses` field of the parent.
|
||||||
connected_multiaddresses: &'a mut FnvHashMap<PeerId, Multiaddr>,
|
connected_multiaddresses: &'a mut FnvHashMap<PeerId, Multiaddr>,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TUserData> PeerConnected<'a, TUserData>
|
impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
{
|
|
||||||
/// Closes the connection to this node.
|
/// Closes the connection to this node.
|
||||||
///
|
///
|
||||||
/// This interrupts all the current substream opening attempts and returns them.
|
|
||||||
/// No `NodeClosed` message will be generated for this node.
|
/// No `NodeClosed` message will be generated for this node.
|
||||||
// TODO: consider returning a `PeerNotConnected` ; however this makes all the borrows things
|
// TODO: consider returning a `PeerNotConnected` ; however this makes all the borrows things
|
||||||
// much more annoying to deal with
|
// much more annoying to deal with
|
||||||
pub fn close(self) -> Vec<TUserData> {
|
pub fn close(self) {
|
||||||
self.connected_multiaddresses.remove(&self.peer_id);
|
self.connected_multiaddresses.remove(&self.peer_id);
|
||||||
self.peer.close()
|
self.peer.close()
|
||||||
}
|
}
|
||||||
@ -813,28 +811,20 @@ where
|
|||||||
self.connected_multiaddresses.get(&self.peer_id)
|
self.connected_multiaddresses.get(&self.peer_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts the process of opening a new outbound substream towards the peer.
|
/// Sends an event to the node.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn open_substream(&mut self, user_data: TUserData) {
|
pub fn send_event(&mut self, event: TInEvent) {
|
||||||
self.peer.open_substream(user_data)
|
self.peer.send_event(event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we are attempting to connect to.
|
/// Access to a peer we are attempting to connect to.
|
||||||
pub struct PeerPendingConnect<'a, TMuxer, TUserData>
|
pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
TMuxer: muxing::StreamMuxer + 'a,
|
|
||||||
{
|
|
||||||
attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>,
|
attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>,
|
||||||
active_nodes: &'a mut CollectionStream<TMuxer, TUserData>,
|
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TMuxer, TUserData> PeerPendingConnect<'a, TMuxer, TUserData>
|
impl<'a, TInEvent, TOutEvent> PeerPendingConnect<'a, TInEvent, TOutEvent> {
|
||||||
where
|
|
||||||
TUserData: Send + 'static,
|
|
||||||
TMuxer: muxing::StreamMuxer,
|
|
||||||
{
|
|
||||||
/// Interrupt this connection attempt.
|
/// Interrupt this connection attempt.
|
||||||
// TODO: consider returning a PeerNotConnected ; however that is really pain in terms of
|
// TODO: consider returning a PeerNotConnected ; however that is really pain in terms of
|
||||||
// borrows
|
// borrows
|
||||||
@ -875,33 +865,35 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we're not connected to.
|
/// Access to a peer we're not connected to.
|
||||||
pub struct PeerNotConnected<'a, TTrans, TMuxer, TUserData>
|
pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandlerBuild: 'a>
|
||||||
where
|
|
||||||
TTrans: Transport + 'a,
|
|
||||||
TMuxer: muxing::StreamMuxer + 'a,
|
|
||||||
TUserData: Send + 'a,
|
|
||||||
{
|
|
||||||
peer_id: PeerId,
|
|
||||||
nodes: &'a mut Swarm<TTrans, TMuxer, TUserData>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a, TTrans, TMuxer, TUserData> PeerNotConnected<'a, TTrans, TMuxer, TUserData>
|
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TMuxer: muxing::StreamMuxer,
|
{
|
||||||
TUserData: Send,
|
peer_id: PeerId,
|
||||||
|
nodes: &'a mut Swarm<TTrans, TInEvent, TOutEvent, THandlerBuild>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerBuild>
|
||||||
|
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandlerBuild>
|
||||||
|
where
|
||||||
|
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
||||||
|
TMuxer: StreamMuxer,
|
||||||
|
THandlerBuild: HandlerFactory<Handler = THandler>,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
|
||||||
|
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
{
|
{
|
||||||
/// Attempts a new connection to this node using the given multiaddress.
|
/// Attempts a new connection to this node using the given multiaddress.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn connect(self, addr: Multiaddr) -> Result<PeerPendingConnect<'a, TMuxer, TUserData>, Self>
|
pub fn connect(self, addr: Multiaddr) -> Result<PeerPendingConnect<'a, TInEvent, TOutEvent>, Self>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
self.connect_inner(addr, Vec::new())
|
self.connect_inner(addr, Vec::new())
|
||||||
}
|
}
|
||||||
@ -915,16 +907,17 @@ where
|
|||||||
pub fn connect_iter<TIter>(
|
pub fn connect_iter<TIter>(
|
||||||
self,
|
self,
|
||||||
addrs: TIter,
|
addrs: TIter,
|
||||||
) -> Result<PeerPendingConnect<'a, TMuxer, TUserData>, Self>
|
) -> Result<PeerPendingConnect<'a, TInEvent, TOutEvent>, Self>
|
||||||
where
|
where
|
||||||
TIter: IntoIterator<Item = Multiaddr>,
|
TIter: IntoIterator<Item = Multiaddr>,
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
let mut addrs = addrs.into_iter();
|
let mut addrs = addrs.into_iter();
|
||||||
let first = addrs.next().unwrap(); // TODO: bad
|
let first = addrs.next().unwrap(); // TODO: bad
|
||||||
@ -937,22 +930,23 @@ where
|
|||||||
self,
|
self,
|
||||||
first: Multiaddr,
|
first: Multiaddr,
|
||||||
rest: Vec<Multiaddr>,
|
rest: Vec<Multiaddr>,
|
||||||
) -> Result<PeerPendingConnect<'a, TMuxer, TUserData>, Self>
|
) -> Result<PeerPendingConnect<'a, TInEvent, TOutEvent>, Self>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Send + 'static,
|
TTrans::MultiaddrFuture: Send + 'static,
|
||||||
TMuxer: Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
{
|
{
|
||||||
let future = match self.nodes.transport().clone().dial(first.clone()) {
|
let future = match self.nodes.transport().clone().dial(first.clone()) {
|
||||||
Ok(fut) => fut,
|
Ok(fut) => fut,
|
||||||
Err(_) => return Err(self),
|
Err(_) => return Err(self),
|
||||||
};
|
};
|
||||||
|
|
||||||
let reach_id = self.nodes.active_nodes.add_reach_attempt(future);
|
let reach_id = self.nodes.active_nodes.add_reach_attempt(future, self.nodes.handler_build.new_handler());
|
||||||
|
|
||||||
let former = self.nodes.out_reach_attempts.insert(
|
let former = self.nodes.out_reach_attempts.insert(
|
||||||
self.peer_id.clone(),
|
self.peer_id.clone(),
|
||||||
@ -976,18 +970,23 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTrans, TMuxer, TUserData> Stream for Swarm<TTrans, TMuxer, TUserData>
|
impl<TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerBuild> Stream for
|
||||||
|
Swarm<TTrans, TInEvent, TOutEvent, THandlerBuild>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
TTrans::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
||||||
TTrans::ListenerUpgrade: Send + 'static,
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TUserData: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
|
TOutEvent: Send + 'static,
|
||||||
|
THandlerBuild: HandlerFactory<Handler = THandler>,
|
||||||
|
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
|
||||||
|
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
{
|
{
|
||||||
type Item = SwarmEvent<TTrans, TMuxer, TUserData>;
|
type Item = SwarmEvent<TTrans, TOutEvent>;
|
||||||
type Error = Void; // TODO: use `!` once stable
|
type Error = Void; // TODO: use `!` once stable
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
@ -998,7 +997,7 @@ where
|
|||||||
upgrade,
|
upgrade,
|
||||||
listen_addr,
|
listen_addr,
|
||||||
}))) => {
|
}))) => {
|
||||||
let id = self.active_nodes.add_reach_attempt(upgrade);
|
let id = self.active_nodes.add_reach_attempt(upgrade, self.handler_build.new_handler());
|
||||||
self.other_reach_attempts.push((
|
self.other_reach_attempts.push((
|
||||||
id,
|
id,
|
||||||
ConnectedPoint::Listener {
|
ConnectedPoint::Listener {
|
||||||
@ -1029,15 +1028,14 @@ where
|
|||||||
match self.active_nodes.poll() {
|
match self.active_nodes.poll() {
|
||||||
Ok(Async::NotReady) => break,
|
Ok(Async::NotReady) => break,
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeReached { peer_id, id }))) => {
|
Ok(Async::Ready(Some(CollectionEvent::NodeReached { peer_id, id }))) => {
|
||||||
let event = self.handle_node_reached(peer_id, id, None);
|
let event = self.handle_node_reached(peer_id, id, false);
|
||||||
return Ok(Async::Ready(Some(event)));
|
return Ok(Async::Ready(Some(event)));
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
||||||
peer_id,
|
peer_id,
|
||||||
id,
|
id,
|
||||||
closed_outbound_substreams,
|
|
||||||
}))) => {
|
}))) => {
|
||||||
let event = self.handle_node_reached(peer_id, id, Some(closed_outbound_substreams));
|
let event = self.handle_node_reached(peer_id, id, true);
|
||||||
return Ok(Async::Ready(Some(event)));
|
return Ok(Async::Ready(Some(event)));
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(CollectionEvent::ReachError { id, error }))) => {
|
Ok(Async::Ready(Some(CollectionEvent::ReachError { id, error }))) => {
|
||||||
@ -1048,7 +1046,6 @@ where
|
|||||||
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
||||||
peer_id,
|
peer_id,
|
||||||
error,
|
error,
|
||||||
closed_outbound_substreams,
|
|
||||||
}))) => {
|
}))) => {
|
||||||
let address = self.connected_multiaddresses.remove(&peer_id);
|
let address = self.connected_multiaddresses.remove(&peer_id);
|
||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
||||||
@ -1056,7 +1053,6 @@ where
|
|||||||
peer_id,
|
peer_id,
|
||||||
address,
|
address,
|
||||||
error,
|
error,
|
||||||
closed_outbound_substreams,
|
|
||||||
})));
|
})));
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) => {
|
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id }))) => {
|
||||||
@ -1064,49 +1060,8 @@ where
|
|||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
||||||
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed { peer_id, address })));
|
return Ok(Async::Ready(Some(SwarmEvent::NodeClosed { peer_id, address })));
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(CollectionEvent::NodeMultiaddr { peer_id, address }))) => {
|
Ok(Async::Ready(Some(CollectionEvent::NodeEvent { peer_id, event }))) => {
|
||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
return Ok(Async::Ready(Some(SwarmEvent::NodeEvent { peer_id, event })));
|
||||||
if let Ok(ref addr) = address {
|
|
||||||
self.connected_multiaddresses
|
|
||||||
.insert(peer_id.clone(), addr.clone());
|
|
||||||
}
|
|
||||||
return Ok(Async::Ready(Some(SwarmEvent::NodeMultiaddr {
|
|
||||||
peer_id,
|
|
||||||
address,
|
|
||||||
})));
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::InboundSubstream {
|
|
||||||
peer_id,
|
|
||||||
substream,
|
|
||||||
}))) => {
|
|
||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
|
||||||
return Ok(Async::Ready(Some(SwarmEvent::InboundSubstream {
|
|
||||||
peer_id,
|
|
||||||
substream,
|
|
||||||
})));
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::OutboundSubstream {
|
|
||||||
peer_id,
|
|
||||||
user_data,
|
|
||||||
substream,
|
|
||||||
}))) => {
|
|
||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
|
||||||
return Ok(Async::Ready(Some(SwarmEvent::OutboundSubstream {
|
|
||||||
peer_id,
|
|
||||||
substream,
|
|
||||||
user_data,
|
|
||||||
})));
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::InboundClosed { peer_id }))) => {
|
|
||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
|
||||||
return Ok(Async::Ready(Some(SwarmEvent::InboundClosed { peer_id })));
|
|
||||||
}
|
|
||||||
Ok(Async::Ready(Some(CollectionEvent::OutboundClosed { peer_id, user_data }))) => {
|
|
||||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
|
||||||
return Ok(Async::Ready(Some(SwarmEvent::OutboundClosed {
|
|
||||||
peer_id,
|
|
||||||
user_data,
|
|
||||||
})));
|
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(None)) => unreachable!("CollectionStream never ends"),
|
Ok(Async::Ready(None)) => unreachable!("CollectionStream never ends"),
|
||||||
Err(_) => unreachable!("CollectionStream never errors"),
|
Err(_) => unreachable!("CollectionStream never errors"),
|
||||||
|
Reference in New Issue
Block a user