mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-17 20:11:22 +00:00
Make nodes generic over PeerId (#881)
This commit is contained in:
@ -30,25 +30,27 @@ use crate::{
|
|||||||
};
|
};
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use std::{collections::hash_map::Entry, error, fmt, mem};
|
use std::{collections::hash_map::Entry, error, fmt, hash::Hash, mem};
|
||||||
|
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
// TODO: make generic over PeerId
|
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles a collection of nodes.
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
|
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId = PeerId> {
|
||||||
/// Object that handles the tasks.
|
/// Object that handles the tasks.
|
||||||
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>,
|
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>,
|
||||||
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
|
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
|
||||||
/// must always be in the `Connected` state.
|
/// must always be in the `Connected` state.
|
||||||
nodes: FnvHashMap<PeerId, TaskId>,
|
nodes: FnvHashMap<TPeerId, TaskId>,
|
||||||
/// List of tasks and their state. If `Connected`, then a corresponding entry must be present
|
/// List of tasks and their state. If `Connected`, then a corresponding entry must be present
|
||||||
/// in `nodes`.
|
/// in `nodes`.
|
||||||
tasks: FnvHashMap<TaskId, TaskState>,
|
tasks: FnvHashMap<TaskId, TaskState<TPeerId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
|
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
|
||||||
|
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
let mut list = f.debug_list();
|
let mut list = f.debug_list();
|
||||||
for (id, task) in &self.tasks {
|
for (id, task) in &self.tasks {
|
||||||
@ -67,18 +69,18 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for Colle
|
|||||||
|
|
||||||
/// State of a task.
|
/// State of a task.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
enum TaskState {
|
enum TaskState<TPeerId> {
|
||||||
/// Task is attempting to reach a peer.
|
/// Task is attempting to reach a peer.
|
||||||
Pending,
|
Pending,
|
||||||
/// The task is connected to a peer.
|
/// The task is connected to a peer.
|
||||||
Connected(PeerId),
|
Connected(TPeerId),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `CollectionStream`.
|
/// Event that can happen on the `CollectionStream`.
|
||||||
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr> {
|
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TPeerId> {
|
||||||
/// A connection to a node has succeeded. You must use the provided event in order to accept
|
/// A connection to a node has succeeded. You must use the provided event in order to accept
|
||||||
/// the connection.
|
/// the connection.
|
||||||
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>),
|
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>),
|
||||||
|
|
||||||
/// A connection to a node has been closed.
|
/// A connection to a node has been closed.
|
||||||
///
|
///
|
||||||
@ -86,7 +88,7 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
|
|||||||
/// substream attempt is pending.
|
/// substream attempt is pending.
|
||||||
NodeClosed {
|
NodeClosed {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// A connection to a node has errored.
|
/// A connection to a node has errored.
|
||||||
@ -94,7 +96,7 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
|
|||||||
/// Can only happen after a node has been successfully reached.
|
/// Can only happen after a node has been successfully reached.
|
||||||
NodeError {
|
NodeError {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: HandledNodeError<THandlerErr>,
|
error: HandledNodeError<THandlerErr>,
|
||||||
},
|
},
|
||||||
@ -112,16 +114,18 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
|
|||||||
/// A node has produced an event.
|
/// A node has produced an event.
|
||||||
NodeEvent {
|
NodeEvent {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// The produced event.
|
/// The produced event.
|
||||||
event: TOutEvent,
|
event: TOutEvent,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
|
||||||
|
CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
where TOutEvent: fmt::Debug,
|
where TOutEvent: fmt::Debug,
|
||||||
TReachErr: fmt::Debug,
|
TReachErr: fmt::Debug,
|
||||||
THandlerErr: fmt::Debug,
|
THandlerErr: fmt::Debug,
|
||||||
|
TPeerId: Eq + Hash + Clone + fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
match *self {
|
match *self {
|
||||||
@ -159,19 +163,23 @@ where TOutEvent: fmt::Debug,
|
|||||||
|
|
||||||
/// Event that happens when we reach a node.
|
/// Event that happens when we reach a node.
|
||||||
#[must_use = "The node reached event is used to accept the newly-opened connection"]
|
#[must_use = "The node reached event is used to accept the newly-opened connection"]
|
||||||
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a> {
|
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a, TPeerId: 'a = PeerId> {
|
||||||
/// Peer id we connected to.
|
/// Peer id we connected to.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// The task id that reached the node.
|
/// The task id that reached the node.
|
||||||
id: TaskId,
|
id: TaskId,
|
||||||
/// The `CollectionStream` we are referencing.
|
/// The `CollectionStream` we are referencing.
|
||||||
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>,
|
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: Eq + Hash + Clone,
|
||||||
|
{
|
||||||
/// Returns the peer id of the node that has been reached.
|
/// Returns the peer id of the node that has been reached.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer_id(&self) -> &PeerId {
|
pub fn peer_id(&self) -> &TPeerId {
|
||||||
&self.peer_id
|
&self.peer_id
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,11 +197,11 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Accepts the new node.
|
/// Accepts the new node.
|
||||||
pub fn accept(self) -> (CollectionNodeAccept, PeerId) {
|
pub fn accept(self) -> (CollectionNodeAccept, TPeerId) {
|
||||||
// Set the state of the task to `Connected`.
|
// Set the state of the task to `Connected`.
|
||||||
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
|
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
|
||||||
let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone()));
|
let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone()));
|
||||||
debug_assert_eq!(_former_state, Some(TaskState::Pending));
|
debug_assert!(_former_state == Some(TaskState::Pending));
|
||||||
|
|
||||||
// It is possible that we already have a task connected to the same peer. In this
|
// It is possible that we already have a task connected to the same peer. In this
|
||||||
// case, we need to emit a `NodeReplaced` event.
|
// case, we need to emit a `NodeReplaced` event.
|
||||||
@ -204,7 +212,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
|
|||||||
self.nodes are valid tasks in the HandledNodesTasks; QED")
|
self.nodes are valid tasks in the HandledNodesTasks; QED")
|
||||||
.close();
|
.close();
|
||||||
let _former_other_state = self.parent.tasks.remove(&former_task_id);
|
let _former_other_state = self.parent.tasks.remove(&former_task_id);
|
||||||
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
|
debug_assert!(_former_other_state == Some(TaskState::Connected(self.peer_id.clone())));
|
||||||
|
|
||||||
// TODO: we unfortunately have to clone the peer id here
|
// TODO: we unfortunately have to clone the peer id here
|
||||||
(CollectionNodeAccept::ReplacedExisting, self.peer_id.clone())
|
(CollectionNodeAccept::ReplacedExisting, self.peer_id.clone())
|
||||||
@ -223,7 +231,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
|
|||||||
///
|
///
|
||||||
/// Has the same effect as dropping the event without accepting it.
|
/// Has the same effect as dropping the event without accepting it.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn deny(self) -> PeerId {
|
pub fn deny(self) -> TPeerId {
|
||||||
// TODO: we unfortunately have to clone the id here, in order to be explicit
|
// TODO: we unfortunately have to clone the id here, in order to be explicit
|
||||||
let peer_id = self.peer_id.clone();
|
let peer_id = self.peer_id.clone();
|
||||||
drop(self); // Just to be explicit
|
drop(self); // Just to be explicit
|
||||||
@ -231,7 +239,11 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
|
||||||
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: Eq + Hash + Clone + fmt::Debug,
|
||||||
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
f.debug_struct("CollectionReachEvent")
|
f.debug_struct("CollectionReachEvent")
|
||||||
.field("peer_id", &self.peer_id)
|
.field("peer_id", &self.peer_id)
|
||||||
@ -240,7 +252,9 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for C
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop for
|
||||||
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let task_state = self.parent.tasks.remove(&self.id);
|
let task_state = self.parent.tasks.remove(&self.id);
|
||||||
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
|
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
|
||||||
@ -266,7 +280,11 @@ pub enum CollectionNodeAccept {
|
|||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct ReachAttemptId(TaskId);
|
pub struct ReachAttemptId(TaskId);
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
|
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: Eq + Hash + Clone,
|
||||||
|
{
|
||||||
/// Creates a new empty collection.
|
/// Creates a new empty collection.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -284,8 +302,8 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
|
|||||||
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
|
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
|
||||||
-> ReachAttemptId
|
-> ReachAttemptId
|
||||||
where
|
where
|
||||||
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
|
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
|
||||||
THandler: IntoNodeHandler + Send + 'static,
|
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
||||||
TReachErr: error::Error + Send + 'static,
|
TReachErr: error::Error + Send + 'static,
|
||||||
@ -294,6 +312,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
|
|||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
|
TPeerId: Send + 'static,
|
||||||
{
|
{
|
||||||
let id = self.inner.add_reach_attempt(future, handler);
|
let id = self.inner.add_reach_attempt(future, handler);
|
||||||
self.tasks.insert(id, TaskState::Pending);
|
self.tasks.insert(id, TaskState::Pending);
|
||||||
@ -338,7 +357,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
|
|||||||
///
|
///
|
||||||
/// Returns `None` if we don't have a connection to this peer.
|
/// Returns `None` if we don't have a connection to this peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TInEvent>> {
|
pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<TInEvent, TPeerId>> {
|
||||||
let task = match self.nodes.get(id) {
|
let task = match self.nodes.get(id) {
|
||||||
Some(&task) => task,
|
Some(&task) => task,
|
||||||
None => return None,
|
None => return None,
|
||||||
@ -358,7 +377,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
|
|||||||
///
|
///
|
||||||
/// This will return true only after a `NodeReached` event has been produced by `poll()`.
|
/// This will return true only after a `NodeReached` event has been produced by `poll()`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn has_connection(&self, id: &PeerId) -> bool {
|
pub fn has_connection(&self, id: &TPeerId) -> bool {
|
||||||
self.nodes.contains_key(id)
|
self.nodes.contains_key(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -366,7 +385,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
|
|||||||
///
|
///
|
||||||
/// Does not include reach attempts that haven't reached any target yet.
|
/// Does not include reach attempts that haven't reached any target yet.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn connections(&self) -> impl Iterator<Item = &PeerId> {
|
pub fn connections(&self) -> impl Iterator<Item = &TPeerId> {
|
||||||
self.nodes.keys()
|
self.nodes.keys()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -375,7 +394,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
|
|||||||
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
|
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
|
||||||
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
|
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
|
||||||
/// > borrowed if necessary.
|
/// > borrowed if necessary.
|
||||||
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>> {
|
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>> {
|
||||||
let item = match self.inner.poll() {
|
let item = match self.inner.poll() {
|
||||||
Async::Ready(item) => item,
|
Async::Ready(item) => item,
|
||||||
Async::NotReady => return Async::NotReady,
|
Async::NotReady => return Async::NotReady,
|
||||||
@ -487,13 +506,16 @@ impl fmt::Display for InterruptError {
|
|||||||
impl error::Error for InterruptError {}
|
impl error::Error for InterruptError {}
|
||||||
|
|
||||||
/// Access to a peer in the collection.
|
/// Access to a peer in the collection.
|
||||||
pub struct PeerMut<'a, TInEvent: 'a> {
|
pub struct PeerMut<'a, TInEvent: 'a, TPeerId: 'a = PeerId> {
|
||||||
inner: HandledNodesTask<'a, TInEvent>,
|
inner: HandledNodesTask<'a, TInEvent>,
|
||||||
tasks: &'a mut FnvHashMap<TaskId, TaskState>,
|
tasks: &'a mut FnvHashMap<TaskId, TaskState<TPeerId>>,
|
||||||
nodes: &'a mut FnvHashMap<PeerId, TaskId>,
|
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent> PeerMut<'a, TInEvent> {
|
impl<'a, TInEvent, TPeerId> PeerMut<'a, TInEvent, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
|
{
|
||||||
/// Sends an event to the given node.
|
/// Sends an event to the given node.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn send_event(&mut self, event: TInEvent) {
|
pub fn send_event(&mut self, event: TInEvent) {
|
||||||
|
@ -40,8 +40,6 @@ use void::Void;
|
|||||||
|
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
// TODO: make generic over PeerId
|
|
||||||
|
|
||||||
// Implementor notes
|
// Implementor notes
|
||||||
// =================
|
// =================
|
||||||
//
|
//
|
||||||
@ -59,7 +57,7 @@ mod tests;
|
|||||||
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles a collection of nodes.
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId = PeerId> {
|
||||||
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
|
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
|
||||||
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
||||||
/// but no longer are, in which case we should ignore them.
|
/// but no longer are, in which case we should ignore them.
|
||||||
@ -73,12 +71,14 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THand
|
|||||||
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
|
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
|
||||||
|
|
||||||
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr>, TaskId)>,
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
|
||||||
/// Receiver side for the events.
|
/// Receiver side for the events.
|
||||||
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr>, TaskId)>,
|
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> fmt::Debug for HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
|
||||||
|
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
f.debug_list()
|
f.debug_list()
|
||||||
.entries(self.tasks.keys().cloned())
|
.entries(self.tasks.keys().cloned())
|
||||||
@ -122,30 +122,30 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Prototype for a `NodeHandler`.
|
/// Prototype for a `NodeHandler`.
|
||||||
pub trait IntoNodeHandler {
|
pub trait IntoNodeHandler<TPeerId = PeerId> {
|
||||||
/// The node handler.
|
/// The node handler.
|
||||||
type Handler: NodeHandler;
|
type Handler: NodeHandler;
|
||||||
|
|
||||||
/// Builds the node handler.
|
/// Builds the node handler.
|
||||||
///
|
///
|
||||||
/// The `PeerId` is the id of the node the handler is going to handle.
|
/// The `TPeerId` is the id of the node the handler is going to handle.
|
||||||
fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler;
|
fn into_handler(self, remote_peer_id: &TPeerId) -> Self::Handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> IntoNodeHandler for T
|
impl<T, TPeerId> IntoNodeHandler<TPeerId> for T
|
||||||
where T: NodeHandler
|
where T: NodeHandler
|
||||||
{
|
{
|
||||||
type Handler = Self;
|
type Handler = Self;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn into_handler(self, _: &PeerId) -> Self {
|
fn into_handler(self, _: &TPeerId) -> Self {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `HandledNodesTasks`.
|
/// Event that can happen on the `HandledNodesTasks`.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId = PeerId> {
|
||||||
/// A task has been closed.
|
/// A task has been closed.
|
||||||
///
|
///
|
||||||
/// This happens once the node handler closes or an error happens.
|
/// This happens once the node handler closes or an error happens.
|
||||||
@ -165,7 +165,7 @@ pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
|||||||
/// Identifier of the task that succeeded.
|
/// Identifier of the task that succeeded.
|
||||||
id: TaskId,
|
id: TaskId,
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// A task has produced an event.
|
/// A task has produced an event.
|
||||||
@ -181,7 +181,9 @@ pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
|||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct TaskId(usize);
|
pub struct TaskId(usize);
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
{
|
||||||
/// Creates a new empty collection.
|
/// Creates a new empty collection.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -202,8 +204,8 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> HandledNodesTask
|
|||||||
/// events.
|
/// events.
|
||||||
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: TIntoHandler) -> TaskId
|
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: TIntoHandler) -> TaskId
|
||||||
where
|
where
|
||||||
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
|
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
|
||||||
TIntoHandler: IntoNodeHandler + Send + 'static,
|
TIntoHandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
TReachErr: error::Error + Send + 'static,
|
TReachErr: error::Error + Send + 'static,
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -212,6 +214,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> HandledNodesTask
|
|||||||
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
|
TPeerId: Send + 'static,
|
||||||
{
|
{
|
||||||
let task_id = self.next_task_id;
|
let task_id = self.next_task_id;
|
||||||
self.next_task_id.0 += 1;
|
self.next_task_id.0 += 1;
|
||||||
@ -264,7 +267,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> HandledNodesTask
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Provides an API similar to `Stream`, except that it cannot produce an error.
|
/// Provides an API similar to `Stream`, except that it cannot produce an error.
|
||||||
pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr>> {
|
pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>> {
|
||||||
for to_spawn in self.to_spawn.drain() {
|
for to_spawn in self.to_spawn.drain() {
|
||||||
tokio_executor::spawn(to_spawn);
|
tokio_executor::spawn(to_spawn);
|
||||||
}
|
}
|
||||||
@ -350,8 +353,10 @@ impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> Stream for HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> Stream for
|
||||||
type Item = HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr>;
|
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
|
||||||
|
{
|
||||||
|
type Item = HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>;
|
||||||
type Error = Void; // TODO: use ! once stable
|
type Error = Void; // TODO: use ! once stable
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -362,9 +367,9 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr> Stream for Handl
|
|||||||
|
|
||||||
/// Message to transmit from a task to the public API.
|
/// Message to transmit from a task to the public API.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {
|
||||||
/// A connection to a node has succeeded.
|
/// A connection to a node has succeeded.
|
||||||
NodeReached(PeerId),
|
NodeReached(TPeerId),
|
||||||
/// The task closed.
|
/// The task closed.
|
||||||
TaskClosed(Result<(), TaskClosedEvent<TReachErr, THandlerErr>>, Option<TIntoHandler>),
|
TaskClosed(Result<(), TaskClosedEvent<TReachErr, THandlerErr>>, Option<TIntoHandler>),
|
||||||
/// An event from the node.
|
/// An event from the node.
|
||||||
@ -373,26 +378,26 @@ enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr> {
|
|||||||
|
|
||||||
/// Implementation of `Future` that handles a single node, and all the communications between
|
/// Implementation of `Future` that handles a single node, and all the communications between
|
||||||
/// the various components of the `HandledNodesTasks`.
|
/// the various components of the `HandledNodesTasks`.
|
||||||
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr>
|
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TIntoHandler: IntoNodeHandler,
|
TIntoHandler: IntoNodeHandler<TPeerId>,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
||||||
{
|
{
|
||||||
/// Sender to transmit events to the outside.
|
/// Sender to transmit events to the outside.
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error>, TaskId)>,
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TPeerId>, TaskId)>,
|
||||||
/// Receiving end for events sent from the main `HandledNodesTasks`.
|
/// Receiving end for events sent from the main `HandledNodesTasks`.
|
||||||
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
|
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
|
||||||
/// Inner state of the `NodeTask`.
|
/// Inner state of the `NodeTask`.
|
||||||
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent>,
|
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>,
|
||||||
/// Identifier of the attempt.
|
/// Identifier of the attempt.
|
||||||
id: TaskId,
|
id: TaskId,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent>
|
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TIntoHandler: IntoNodeHandler,
|
TIntoHandler: IntoNodeHandler<TPeerId>,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
||||||
{
|
{
|
||||||
/// Future to resolve to connect to the node.
|
/// Future to resolve to connect to the node.
|
||||||
@ -414,12 +419,12 @@ where
|
|||||||
Poisoned,
|
Poisoned,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr> Future for
|
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId> Future for
|
||||||
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr>
|
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr>,
|
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr>,
|
||||||
TIntoHandler: IntoNodeHandler,
|
TIntoHandler: IntoNodeHandler<TPeerId>,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Item = ();
|
||||||
|
@ -42,6 +42,7 @@ type TestNodeTask = NodeTask<
|
|||||||
InEvent,
|
InEvent,
|
||||||
OutEvent,
|
OutEvent,
|
||||||
io::Error,
|
io::Error,
|
||||||
|
PeerId,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
struct NodeTaskTestBuilder {
|
struct NodeTaskTestBuilder {
|
||||||
@ -75,9 +76,9 @@ impl NodeTaskTestBuilder {
|
|||||||
fn node_task(&mut self) -> (
|
fn node_task(&mut self) -> (
|
||||||
TestNodeTask,
|
TestNodeTask,
|
||||||
UnboundedSender<InEvent>,
|
UnboundedSender<InEvent>,
|
||||||
UnboundedReceiver<(InToExtMessage<OutEvent, Handler, io::Error, io::Error>, TaskId)>,
|
UnboundedReceiver<(InToExtMessage<OutEvent, Handler, io::Error, io::Error, PeerId>, TaskId)>,
|
||||||
) {
|
) {
|
||||||
let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage<OutEvent, Handler, _, _>, TaskId)>();
|
let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage<OutEvent, Handler, _, _, _>, TaskId)>();
|
||||||
let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::<InEvent>();
|
let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::<InEvent>();
|
||||||
let inner = if self.inner_node.is_some() {
|
let inner = if self.inner_node.is_some() {
|
||||||
NodeTaskInner::Node(self.inner_node.take().unwrap())
|
NodeTaskInner::Node(self.inner_node.take().unwrap())
|
||||||
@ -285,7 +286,7 @@ fn iterate_over_all_tasks() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn add_reach_attempt_prepares_a_new_task() {
|
fn add_reach_attempt_prepares_a_new_task() {
|
||||||
let mut handled_nodes = HandledNodesTasks::new();
|
let mut handled_nodes: HandledNodesTasks<_, _, _, _, _> = HandledNodesTasks::new();
|
||||||
assert_eq!(handled_nodes.tasks().count(), 0);
|
assert_eq!(handled_nodes.tasks().count(), 0);
|
||||||
assert_eq!(handled_nodes.to_spawn.len(), 0);
|
assert_eq!(handled_nodes.to_spawn.len(), 0);
|
||||||
|
|
||||||
|
@ -47,13 +47,13 @@ use std::{
|
|||||||
collections::hash_map::{Entry, OccupiedEntry},
|
collections::hash_map::{Entry, OccupiedEntry},
|
||||||
error,
|
error,
|
||||||
fmt,
|
fmt,
|
||||||
|
hash::Hash,
|
||||||
};
|
};
|
||||||
|
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles the nodes.
|
/// Implementation of `Stream` that handles the nodes.
|
||||||
#[derive(Debug)]
|
pub struct RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId = PeerId>
|
||||||
pub struct RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
@ -61,30 +61,59 @@ where
|
|||||||
listeners: ListenersStream<TTrans>,
|
listeners: ListenersStream<TTrans>,
|
||||||
|
|
||||||
/// The nodes currently active.
|
/// The nodes currently active.
|
||||||
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error>, THandlerErr>,
|
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>,
|
||||||
|
|
||||||
/// The reach attempts of the swarm.
|
/// The reach attempts of the swarm.
|
||||||
/// This needs to be a separate struct in order to handle multiple mutable borrows issues.
|
/// This needs to be a separate struct in order to handle multiple mutable borrows issues.
|
||||||
reach_attempts: ReachAttempts,
|
reach_attempts: ReachAttempts<TPeerId>,
|
||||||
|
|
||||||
/// Max numer of incoming connections.
|
/// Max numer of incoming connections.
|
||||||
incoming_limit: Option<u32>,
|
incoming_limit: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
impl<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
||||||
struct ReachAttempts {
|
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TTrans: Transport + fmt::Debug,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
|
f.debug_struct("ReachAttempts")
|
||||||
|
.field("listeners", &self.listeners)
|
||||||
|
.field("active_nodes", &self.active_nodes)
|
||||||
|
.field("reach_attempts", &self.reach_attempts)
|
||||||
|
.field("incoming_limit", &self.incoming_limit)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ReachAttempts<TPeerId> {
|
||||||
/// Peer ID of the node we control.
|
/// Peer ID of the node we control.
|
||||||
local_peer_id: PeerId,
|
local_peer_id: TPeerId,
|
||||||
|
|
||||||
/// Attempts to reach a peer.
|
/// Attempts to reach a peer.
|
||||||
out_reach_attempts: FnvHashMap<PeerId, OutReachAttempt>,
|
out_reach_attempts: FnvHashMap<TPeerId, OutReachAttempt>,
|
||||||
|
|
||||||
/// Reach attempts for incoming connections, and outgoing connections for which we don't know
|
/// Reach attempts for incoming connections, and outgoing connections for which we don't know
|
||||||
/// the peer ID.
|
/// the peer ID.
|
||||||
other_reach_attempts: Vec<(ReachAttemptId, ConnectedPoint)>,
|
other_reach_attempts: Vec<(ReachAttemptId, ConnectedPoint)>,
|
||||||
|
|
||||||
/// For each peer ID we're connected to, contains the endpoint we're connected to.
|
/// For each peer ID we're connected to, contains the endpoint we're connected to.
|
||||||
connected_points: FnvHashMap<PeerId, ConnectedPoint>,
|
connected_points: FnvHashMap<TPeerId, ConnectedPoint>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TPeerId> fmt::Debug for ReachAttempts<TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
|
f.debug_struct("ReachAttempts")
|
||||||
|
.field("local_peer_id", &self.local_peer_id)
|
||||||
|
.field("out_reach_attempts", &self.out_reach_attempts)
|
||||||
|
.field("other_reach_attempts", &self.other_reach_attempts)
|
||||||
|
.field("connected_points", &self.connected_points)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt to reach a peer.
|
/// Attempt to reach a peer.
|
||||||
@ -99,7 +128,7 @@ struct OutReachAttempt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `RawSwarm`.
|
/// Event that can happen on the `RawSwarm`.
|
||||||
pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
|
pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a = PeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
@ -114,7 +143,7 @@ where
|
|||||||
},
|
},
|
||||||
|
|
||||||
/// A new connection arrived on a listener.
|
/// A new connection arrived on a listener.
|
||||||
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
|
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
||||||
|
|
||||||
/// A new connection was arriving on a listener, but an error happened when negotiating it.
|
/// A new connection was arriving on a listener, but an error happened when negotiating it.
|
||||||
///
|
///
|
||||||
@ -132,7 +161,7 @@ where
|
|||||||
/// A new connection to a peer has been opened.
|
/// A new connection to a peer has been opened.
|
||||||
Connected {
|
Connected {
|
||||||
/// Id of the peer.
|
/// Id of the peer.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
||||||
/// we opened.
|
/// we opened.
|
||||||
endpoint: ConnectedPoint,
|
endpoint: ConnectedPoint,
|
||||||
@ -141,7 +170,7 @@ where
|
|||||||
/// A connection to a peer has been replaced with a new one.
|
/// A connection to a peer has been replaced with a new one.
|
||||||
Replaced {
|
Replaced {
|
||||||
/// Id of the peer.
|
/// Id of the peer.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// Endpoint we were connected to.
|
/// Endpoint we were connected to.
|
||||||
closed_endpoint: ConnectedPoint,
|
closed_endpoint: ConnectedPoint,
|
||||||
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
||||||
@ -155,7 +184,7 @@ where
|
|||||||
/// substream attempt is pending.
|
/// substream attempt is pending.
|
||||||
NodeClosed {
|
NodeClosed {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// Endpoint we were connected to.
|
/// Endpoint we were connected to.
|
||||||
endpoint: ConnectedPoint,
|
endpoint: ConnectedPoint,
|
||||||
},
|
},
|
||||||
@ -163,7 +192,7 @@ where
|
|||||||
/// The handler of a node has produced an error.
|
/// The handler of a node has produced an error.
|
||||||
NodeError {
|
NodeError {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// Endpoint we were connected to.
|
/// Endpoint we were connected to.
|
||||||
endpoint: ConnectedPoint,
|
endpoint: ConnectedPoint,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
@ -178,13 +207,13 @@ where
|
|||||||
remain_addrs_attempt: usize,
|
remain_addrs_attempt: usize,
|
||||||
|
|
||||||
/// Id of the peer we were trying to dial.
|
/// Id of the peer we were trying to dial.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
|
|
||||||
/// The multiaddr we failed to reach.
|
/// The multiaddr we failed to reach.
|
||||||
multiaddr: Multiaddr,
|
multiaddr: Multiaddr,
|
||||||
|
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: RawSwarmReachError<TTrans::Error>,
|
error: RawSwarmReachError<TTrans::Error, TPeerId>,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Failed to reach a peer that we were trying to dial.
|
/// Failed to reach a peer that we were trying to dial.
|
||||||
@ -202,18 +231,20 @@ where
|
|||||||
/// A node produced a custom event.
|
/// A node produced a custom event.
|
||||||
NodeEvent {
|
NodeEvent {
|
||||||
/// Id of the node that produced the event.
|
/// Id of the node that produced the event.
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
/// Event that was produced by the node.
|
/// Event that was produced by the node.
|
||||||
event: TOutEvent,
|
event: TOutEvent,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
||||||
|
RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TOutEvent: fmt::Debug,
|
TOutEvent: fmt::Debug,
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TTrans::Error: fmt::Debug,
|
TTrans::Error: fmt::Debug,
|
||||||
THandlerErr: fmt::Debug,
|
THandlerErr: fmt::Debug,
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
match *self {
|
match *self {
|
||||||
@ -288,27 +319,29 @@ where
|
|||||||
|
|
||||||
/// Internal error type that contains all the possible errors that can happen in a reach attempt.
|
/// Internal error type that contains all the possible errors that can happen in a reach attempt.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum InternalReachErr<TTransErr> {
|
enum InternalReachErr<TTransErr, TPeerId> {
|
||||||
/// Error in the transport layer.
|
/// Error in the transport layer.
|
||||||
Transport(TransportError<TTransErr>),
|
Transport(TransportError<TTransErr>),
|
||||||
/// We successfully reached the peer, but there was a mismatch between the expected id and the
|
/// We successfully reached the peer, but there was a mismatch between the expected id and the
|
||||||
/// actual id of the peer.
|
/// actual id of the peer.
|
||||||
PeerIdMismatch {
|
PeerIdMismatch {
|
||||||
/// The peer id that the node reports.
|
/// The peer id that the node reports.
|
||||||
obtained: PeerId,
|
obtained: TPeerId,
|
||||||
},
|
},
|
||||||
/// The negotiated `PeerId` is the same as the one of the local node.
|
/// The negotiated `PeerId` is the same as the one of the local node.
|
||||||
FoundLocalPeerId,
|
FoundLocalPeerId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr> fmt::Display for InternalReachErr<TTransErr>
|
impl<TTransErr, TPeerId> fmt::Display for InternalReachErr<TTransErr, TPeerId>
|
||||||
where TTransErr: fmt::Display
|
where
|
||||||
|
TTransErr: fmt::Display,
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
InternalReachErr::Transport(err) => write!(f, "{}", err),
|
InternalReachErr::Transport(err) => write!(f, "{}", err),
|
||||||
InternalReachErr::PeerIdMismatch { obtained } => {
|
InternalReachErr::PeerIdMismatch { obtained } => {
|
||||||
write!(f, "Peer ID mismatch, obtained: {}", obtained.to_base58())
|
write!(f, "Peer ID mismatch, obtained: {:?}", obtained)
|
||||||
},
|
},
|
||||||
InternalReachErr::FoundLocalPeerId => {
|
InternalReachErr::FoundLocalPeerId => {
|
||||||
write!(f, "Remote has the same PeerId as us")
|
write!(f, "Remote has the same PeerId as us")
|
||||||
@ -317,8 +350,10 @@ where TTransErr: fmt::Display
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr> error::Error for InternalReachErr<TTransErr>
|
impl<TTransErr, TPeerId> error::Error for InternalReachErr<TTransErr, TPeerId>
|
||||||
where TTransErr: error::Error + 'static
|
where
|
||||||
|
TTransErr: error::Error + 'static,
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
@ -331,7 +366,7 @@ where TTransErr: error::Error + 'static
|
|||||||
|
|
||||||
/// Error that can happen when trying to reach a node.
|
/// Error that can happen when trying to reach a node.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum RawSwarmReachError<TTransErr> {
|
pub enum RawSwarmReachError<TTransErr, TPeerId> {
|
||||||
/// Error in the transport layer.
|
/// Error in the transport layer.
|
||||||
Transport(TransportError<TTransErr>),
|
Transport(TransportError<TTransErr>),
|
||||||
|
|
||||||
@ -339,25 +374,29 @@ pub enum RawSwarmReachError<TTransErr> {
|
|||||||
/// actual id of the peer.
|
/// actual id of the peer.
|
||||||
PeerIdMismatch {
|
PeerIdMismatch {
|
||||||
/// The peer id that the node reports.
|
/// The peer id that the node reports.
|
||||||
obtained: PeerId,
|
obtained: TPeerId,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr> fmt::Display for RawSwarmReachError<TTransErr>
|
impl<TTransErr, TPeerId> fmt::Display for RawSwarmReachError<TTransErr, TPeerId>
|
||||||
where TTransErr: fmt::Display
|
where
|
||||||
|
TTransErr: fmt::Display,
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
RawSwarmReachError::Transport(err) => write!(f, "{}", err),
|
RawSwarmReachError::Transport(err) => write!(f, "{}", err),
|
||||||
RawSwarmReachError::PeerIdMismatch { obtained } => {
|
RawSwarmReachError::PeerIdMismatch { obtained } => {
|
||||||
write!(f, "Peer ID mismatch, obtained: {}", obtained.to_base58())
|
write!(f, "Peer ID mismatch, obtained: {:?}", obtained)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr> error::Error for RawSwarmReachError<TTransErr>
|
impl<TTransErr, TPeerId> error::Error for RawSwarmReachError<TTransErr, TPeerId>
|
||||||
where TTransErr: error::Error + 'static
|
where
|
||||||
|
TTransErr: error::Error + 'static,
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
@ -442,30 +481,30 @@ where TTransErr: error::Error + 'static
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A new connection arrived on a listener.
|
/// A new connection arrived on a listener.
|
||||||
pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
|
pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
||||||
where TTrans: Transport
|
where TTrans: Transport
|
||||||
{
|
{
|
||||||
/// The produced upgrade.
|
/// The produced upgrade.
|
||||||
upgrade: TTrans::ListenerUpgrade,
|
upgrade: TTrans::ListenerUpgrade,
|
||||||
/// PeerId of the local node.
|
/// PeerId of the local node.
|
||||||
local_peer_id: PeerId,
|
local_peer_id: TPeerId,
|
||||||
/// Address of the listener which received the connection.
|
/// Address of the listener which received the connection.
|
||||||
listen_addr: Multiaddr,
|
listen_addr: Multiaddr,
|
||||||
/// Address used to send back data to the remote.
|
/// Address used to send back data to the remote.
|
||||||
send_back_addr: Multiaddr,
|
send_back_addr: Multiaddr,
|
||||||
/// Reference to the `active_nodes` field of the swarm.
|
/// Reference to the `active_nodes` field of the swarm.
|
||||||
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error>, THandlerErr>,
|
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>,
|
||||||
/// Reference to the `other_reach_attempts` field of the swarm.
|
/// Reference to the `other_reach_attempts` field of the swarm.
|
||||||
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
|
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId>
|
||||||
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::ListenerUpgrade: Send + 'static,
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
THandler: IntoNodeHandler + Send + 'static,
|
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -474,6 +513,7 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
/// Starts processing the incoming connection and sets the handler to use for it.
|
/// Starts processing the incoming connection and sets the handler to use for it.
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -505,7 +545,8 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
|
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where TTrans: Transport
|
where TTrans: Transport
|
||||||
{
|
{
|
||||||
/// Returns the `IncomingInfo` corresponding to this incoming connection.
|
/// Returns the `IncomingInfo` corresponding to this incoming connection.
|
||||||
@ -617,19 +658,20 @@ impl<'a> IncomingInfo<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
|
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId>
|
||||||
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport + Clone,
|
TTrans: Transport + Clone,
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
THandler: IntoNodeHandler + Send + 'static,
|
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
/// Creates a new node events stream.
|
/// Creates a new node events stream.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new(transport: TTrans, local_peer_id: PeerId) -> Self {
|
pub fn new(transport: TTrans, local_peer_id: TPeerId) -> Self {
|
||||||
// TODO: with_capacity?
|
// TODO: with_capacity?
|
||||||
RawSwarm {
|
RawSwarm {
|
||||||
listeners: ListenersStream::new(transport),
|
listeners: ListenersStream::new(transport),
|
||||||
@ -647,7 +689,7 @@ where
|
|||||||
/// Creates a new node event stream with incoming connections limit.
|
/// Creates a new node event stream with incoming connections limit.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new_with_incoming_limit(transport: TTrans,
|
pub fn new_with_incoming_limit(transport: TTrans,
|
||||||
local_peer_id: PeerId, incoming_limit: Option<u32>) -> Self
|
local_peer_id: TPeerId, incoming_limit: Option<u32>) -> Self
|
||||||
{
|
{
|
||||||
RawSwarm {
|
RawSwarm {
|
||||||
incoming_limit,
|
incoming_limit,
|
||||||
@ -709,7 +751,7 @@ where
|
|||||||
///
|
///
|
||||||
/// This is the same value as was passed to `new()`.
|
/// This is the same value as was passed to `new()`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn local_peer_id(&self) -> &PeerId {
|
pub fn local_peer_id(&self) -> &TPeerId {
|
||||||
&self.reach_attempts.local_peer_id
|
&self.reach_attempts.local_peer_id
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -718,7 +760,7 @@ where
|
|||||||
/// The second parameter is the handler to use if we manage to reach a node.
|
/// The second parameter is the handler to use if we manage to reach a node.
|
||||||
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError<TTrans::Error>>
|
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError<TTrans::Error>>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
@ -788,7 +830,7 @@ where
|
|||||||
|
|
||||||
/// Grants access to a struct that represents a peer.
|
/// Grants access to a struct that represents a peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer(&mut self, peer_id: PeerId) -> Peer<TTrans, TInEvent, TOutEvent, THandler, THandlerErr> {
|
pub fn peer(&mut self, peer_id: TPeerId) -> Peer<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> {
|
||||||
if peer_id == self.reach_attempts.local_peer_id {
|
if peer_id == self.reach_attempts.local_peer_id {
|
||||||
return Peer::LocalNode;
|
return Peer::LocalNode;
|
||||||
}
|
}
|
||||||
@ -831,9 +873,9 @@ where
|
|||||||
///
|
///
|
||||||
/// It is a logic error to call this method if we already have an outgoing attempt to the
|
/// It is a logic error to call this method if we already have an outgoing attempt to the
|
||||||
/// given peer.
|
/// given peer.
|
||||||
fn start_dial_out(&mut self, peer_id: PeerId, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
@ -875,9 +917,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Provides an API similar to `Stream`, except that it cannot error.
|
/// Provides an API similar to `Stream`, except that it cannot error.
|
||||||
pub fn poll(&mut self) -> Async<RawSwarmEvent<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>>
|
pub fn poll(&mut self) -> Async<RawSwarmEvent<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)>,
|
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::ListenerUpgrade: Send + 'static,
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
@ -886,7 +928,7 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
THandler: IntoNodeHandler + Send + 'static,
|
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -999,12 +1041,12 @@ where
|
|||||||
/// Internal struct indicating an action to perform of the swarm.
|
/// Internal struct indicating an action to perform of the swarm.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
struct ActionItem<THandler> {
|
struct ActionItem<THandler, TPeerId> {
|
||||||
start_dial_out: Option<(PeerId, THandler, Multiaddr, Vec<Multiaddr>)>,
|
start_dial_out: Option<(TPeerId, THandler, Multiaddr, Vec<Multiaddr>)>,
|
||||||
interrupt: Option<ReachAttemptId>,
|
interrupt: Option<ReachAttemptId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<THandler> Default for ActionItem<THandler> {
|
impl<THandler, TPeerId> Default for ActionItem<THandler, TPeerId> {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
ActionItem {
|
ActionItem {
|
||||||
start_dial_out: None,
|
start_dial_out: None,
|
||||||
@ -1019,17 +1061,18 @@ impl<THandler> Default for ActionItem<THandler> {
|
|||||||
///
|
///
|
||||||
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
||||||
/// > panics will likely happen.
|
/// > panics will likely happen.
|
||||||
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>(
|
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>(
|
||||||
reach_attempts: &mut ReachAttempts,
|
reach_attempts: &mut ReachAttempts<TPeerId>,
|
||||||
event: CollectionReachEvent<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error>, THandlerErr>
|
event: CollectionReachEvent<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>,
|
||||||
) -> (ActionItem<THandler>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>)
|
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>)
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
||||||
// it also makes the logic easier.
|
// it also makes the logic easier.
|
||||||
@ -1137,8 +1180,11 @@ where
|
|||||||
/// This means that if `local` and `other` both dial each other, the connection from `local` should
|
/// This means that if `local` and `other` both dial each other, the connection from `local` should
|
||||||
/// be kept and the one from `other` will be dropped.
|
/// be kept and the one from `other` will be dropped.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool {
|
fn has_dial_prio<TPeerId>(local: &TPeerId, other: &TPeerId) -> bool
|
||||||
local.as_bytes() < other.as_bytes()
|
where
|
||||||
|
TPeerId: AsRef<[u8]>,
|
||||||
|
{
|
||||||
|
local.as_ref() < other.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles a reach error event from the collection.
|
/// Handles a reach error event from the collection.
|
||||||
@ -1147,13 +1193,15 @@ fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool {
|
|||||||
///
|
///
|
||||||
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
||||||
/// > panics will likely happen.
|
/// > panics will likely happen.
|
||||||
fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>(
|
fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>(
|
||||||
reach_attempts: &mut ReachAttempts,
|
reach_attempts: &mut ReachAttempts<TPeerId>,
|
||||||
reach_id: ReachAttemptId,
|
reach_id: ReachAttemptId,
|
||||||
error: InternalReachErr<TTrans::Error>,
|
error: InternalReachErr<TTrans::Error, TPeerId>,
|
||||||
handler: THandler,
|
handler: THandler,
|
||||||
) -> (ActionItem<THandler>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>)
|
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>)
|
||||||
where TTrans: Transport
|
where
|
||||||
|
TTrans: Transport,
|
||||||
|
TPeerId: Eq + Hash + Clone,
|
||||||
{
|
{
|
||||||
// Search for the attempt in `out_reach_attempts`.
|
// Search for the attempt in `out_reach_attempts`.
|
||||||
// TODO: could be more optimal than iterating over everything
|
// TODO: could be more optimal than iterating over everything
|
||||||
@ -1251,29 +1299,31 @@ where TTrans: Transport
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// State of a peer in the system.
|
/// State of a peer in the system.
|
||||||
pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
|
pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
/// We are connected to this peer.
|
/// We are connected to this peer.
|
||||||
Connected(PeerConnected<'a, TInEvent>),
|
Connected(PeerConnected<'a, TInEvent, TPeerId>),
|
||||||
|
|
||||||
/// We are currently attempting to connect to this peer.
|
/// We are currently attempting to connect to this peer.
|
||||||
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
|
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
||||||
|
|
||||||
/// We are not connected to this peer at all.
|
/// We are not connected to this peer at all.
|
||||||
///
|
///
|
||||||
/// > **Note**: It is however possible that a pending incoming connection is being negotiated
|
/// > **Note**: It is however possible that a pending incoming connection is being negotiated
|
||||||
/// > and will connect to this peer, but we don't know it yet.
|
/// > and will connect to this peer, but we don't know it yet.
|
||||||
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
|
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
||||||
|
|
||||||
/// The requested peer is the local node.
|
/// The requested peer is the local node.
|
||||||
LocalNode,
|
LocalNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
||||||
|
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
|
TPeerId: Eq + Hash + fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
match *self {
|
match *self {
|
||||||
@ -1302,10 +1352,10 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
|
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
|
||||||
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>
|
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
@ -1313,14 +1363,15 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
THandler: IntoNodeHandler + Send + 'static,
|
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
/// If we are connected, returns the `PeerConnected`.
|
/// If we are connected, returns the `PeerConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent>> {
|
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::Connected(peer) => Some(peer),
|
Peer::Connected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1329,7 +1380,7 @@ where
|
|||||||
|
|
||||||
/// If a connection is pending, returns the `PeerPendingConnect`.
|
/// If a connection is pending, returns the `PeerPendingConnect`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>> {
|
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::PendingConnect(peer) => Some(peer),
|
Peer::PendingConnect(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1338,7 +1389,7 @@ where
|
|||||||
|
|
||||||
/// If we are not connected, returns the `PeerNotConnected`.
|
/// If we are not connected, returns the `PeerNotConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>> {
|
pub fn as_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::NotConnected(peer) => Some(peer),
|
Peer::NotConnected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1353,7 +1404,7 @@ where
|
|||||||
/// Returns an error if we are `LocalNode`.
|
/// Returns an error if we are `LocalNode`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn or_connect(self, addr: Multiaddr, handler: THandler)
|
pub fn or_connect(self, addr: Multiaddr, handler: THandler)
|
||||||
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
|
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>, Self>
|
||||||
{
|
{
|
||||||
self.or_connect_with(move |_| addr, handler)
|
self.or_connect_with(move |_| addr, handler)
|
||||||
}
|
}
|
||||||
@ -1367,9 +1418,9 @@ where
|
|||||||
/// Returns an error if we are `LocalNode`.
|
/// Returns an error if we are `LocalNode`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn or_connect_with<TFn>(self, addr: TFn, handler: THandler)
|
pub fn or_connect_with<TFn>(self, addr: TFn, handler: THandler)
|
||||||
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
|
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>, Self>
|
||||||
where
|
where
|
||||||
TFn: FnOnce(&PeerId) -> Multiaddr,
|
TFn: FnOnce(&TPeerId) -> Multiaddr,
|
||||||
{
|
{
|
||||||
match self {
|
match self {
|
||||||
Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)),
|
Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)),
|
||||||
@ -1384,21 +1435,22 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Peer we are potentially going to connect to.
|
/// Peer we are potentially going to connect to.
|
||||||
pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
|
pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
||||||
where
|
where
|
||||||
TTrans: Transport
|
TTrans: Transport
|
||||||
{
|
{
|
||||||
/// We are connected to this peer.
|
/// We are connected to this peer.
|
||||||
Connected(PeerConnected<'a, TInEvent>),
|
Connected(PeerConnected<'a, TInEvent, TPeerId>),
|
||||||
|
|
||||||
/// We are currently attempting to connect to this peer.
|
/// We are currently attempting to connect to this peer.
|
||||||
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
|
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport
|
TTrans: Transport,
|
||||||
|
TPeerId: Eq + Hash + Clone,
|
||||||
{
|
{
|
||||||
/// Closes the connection or the connection attempt.
|
/// Closes the connection or the connection attempt.
|
||||||
// TODO: consider returning a `PeerNotConnected`
|
// TODO: consider returning a `PeerNotConnected`
|
||||||
@ -1412,7 +1464,7 @@ where
|
|||||||
|
|
||||||
/// If we are connected, returns the `PeerConnected`.
|
/// If we are connected, returns the `PeerConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent>> {
|
pub fn as_connected(self) -> Option<PeerConnected<'a, TInEvent, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::Connected(peer) => Some(peer),
|
PeerPotentialConnect::Connected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1421,7 +1473,7 @@ where
|
|||||||
|
|
||||||
/// If a connection is pending, returns the `PeerPendingConnect`.
|
/// If a connection is pending, returns the `PeerPendingConnect`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>> {
|
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
|
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1430,14 +1482,17 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we are connected to.
|
/// Access to a peer we are connected to.
|
||||||
pub struct PeerConnected<'a, TInEvent: 'a> {
|
pub struct PeerConnected<'a, TInEvent: 'a, TPeerId: 'a> {
|
||||||
peer: CollecPeerMut<'a, TInEvent>,
|
peer: CollecPeerMut<'a, TInEvent, TPeerId>,
|
||||||
/// Reference to the `connected_points` field of the parent.
|
/// Reference to the `connected_points` field of the parent.
|
||||||
connected_points: &'a mut FnvHashMap<PeerId, ConnectedPoint>,
|
connected_points: &'a mut FnvHashMap<TPeerId, ConnectedPoint>,
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
|
impl<'a, TInEvent, TPeerId> PeerConnected<'a, TInEvent, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
|
{
|
||||||
/// Closes the connection to this node.
|
/// Closes the connection to this node.
|
||||||
///
|
///
|
||||||
/// No `NodeClosed` message will be generated for this node.
|
/// No `NodeClosed` message will be generated for this node.
|
||||||
@ -1467,17 +1522,19 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
|
|||||||
|
|
||||||
/// Access to a peer we are attempting to connect to.
|
/// Access to a peer we are attempting to connect to.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PeerPendingConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
|
pub struct PeerPendingConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
||||||
where
|
where
|
||||||
TTrans: Transport
|
TTrans: Transport
|
||||||
{
|
{
|
||||||
attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>,
|
attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>,
|
||||||
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error>, THandlerErr>,
|
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, TPeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
|
PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport
|
TTrans: Transport,
|
||||||
|
TPeerId: Eq + Hash + Clone,
|
||||||
{
|
{
|
||||||
/// Interrupt this connection attempt.
|
/// Interrupt this connection attempt.
|
||||||
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
|
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
|
||||||
@ -1520,25 +1577,37 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we're not connected to.
|
/// Access to a peer we're not connected to.
|
||||||
#[derive(Debug)]
|
pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
||||||
pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
|
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
peer_id: PeerId,
|
peer_id: TPeerId,
|
||||||
nodes: &'a mut RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>,
|
nodes: &'a mut RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
||||||
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
TTrans: Transport,
|
||||||
|
TPeerId: fmt::Debug,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||||
|
f.debug_struct("PeerNotConnected")
|
||||||
|
.field("peer_id", &self.peer_id)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId>
|
||||||
|
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
THandler: IntoNodeHandler + Send + 'static,
|
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -1551,7 +1620,9 @@ where
|
|||||||
/// the whole connection is immediately closed.
|
/// the whole connection is immediately closed.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn connect(self, addr: Multiaddr, handler: THandler)
|
pub fn connect(self, addr: Multiaddr, handler: THandler)
|
||||||
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
self.connect_inner(handler, addr, Vec::new())
|
self.connect_inner(handler, addr, Vec::new())
|
||||||
}
|
}
|
||||||
@ -1566,9 +1637,10 @@ where
|
|||||||
/// the whole connection is immediately closed.
|
/// the whole connection is immediately closed.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn connect_iter<TIter>(self, addrs: TIter, handler: THandler)
|
pub fn connect_iter<TIter>(self, addrs: TIter, handler: THandler)
|
||||||
-> Result<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
|
-> Result<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>, Self>
|
||||||
where
|
where
|
||||||
TIter: IntoIterator<Item = Multiaddr>,
|
TIter: IntoIterator<Item = Multiaddr>,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
let mut addrs = addrs.into_iter();
|
let mut addrs = addrs.into_iter();
|
||||||
let first = match addrs.next() {
|
let first = match addrs.next() {
|
||||||
@ -1581,7 +1653,9 @@ where
|
|||||||
|
|
||||||
/// Inner implementation of `connect`.
|
/// Inner implementation of `connect`.
|
||||||
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
||||||
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
|
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest);
|
self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest);
|
||||||
PeerPendingConnect {
|
PeerPendingConnect {
|
||||||
|
@ -155,6 +155,13 @@ impl AsRef<multihash::Multihash> for PeerId {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AsRef<[u8]> for PeerId {
|
||||||
|
#[inline]
|
||||||
|
fn as_ref(&self) -> &[u8] {
|
||||||
|
self.as_bytes()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Into<multihash::Multihash> for PeerId {
|
impl Into<multihash::Multihash> for PeerId {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn into(self) -> multihash::Multihash {
|
fn into(self) -> multihash::Multihash {
|
||||||
|
@ -198,7 +198,7 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
|
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let local_node_is_providing = self.providing_keys.iter().any(|k| k.as_ref() == &key);
|
let local_node_is_providing = self.providing_keys.iter().any(|k| k == &key);
|
||||||
|
|
||||||
let provider_peers = topology
|
let provider_peers = topology
|
||||||
.get_providers(&key)
|
.get_providers(&key)
|
||||||
@ -258,7 +258,7 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
/// There doesn't exist any "remove provider" message to broadcast on the network, therefore we
|
/// There doesn't exist any "remove provider" message to broadcast on the network, therefore we
|
||||||
/// will still be registered as a provider in the DHT for as long as the timeout doesn't expire.
|
/// will still be registered as a provider in the DHT for as long as the timeout doesn't expire.
|
||||||
pub fn remove_providing(&mut self, key: &Multihash) {
|
pub fn remove_providing(&mut self, key: &Multihash) {
|
||||||
if let Some(position) = self.providing_keys.iter().position(|k| k.as_ref() == key) {
|
if let Some(position) = self.providing_keys.iter().position(|k| k == key) {
|
||||||
self.providing_keys.remove(position);
|
self.providing_keys.remove(position);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -392,7 +392,7 @@ where
|
|||||||
Ok(Async::NotReady) => {},
|
Ok(Async::NotReady) => {},
|
||||||
Ok(Async::Ready(Some(_))) => {
|
Ok(Async::Ready(Some(_))) => {
|
||||||
for provided in self.providing_keys.clone().into_iter() {
|
for provided in self.providing_keys.clone().into_iter() {
|
||||||
let purpose = QueryPurpose::AddProvider(provided.as_ref().clone());
|
let purpose = QueryPurpose::AddProvider(provided.clone().into());
|
||||||
self.start_query(QueryTarget::FindPeer(provided), purpose);
|
self.start_query(QueryTarget::FindPeer(provided), purpose);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
Reference in New Issue
Block a user