mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-13 10:01:25 +00:00
Generalize TPeerId into TConnInfo (#1045)
* Generalize TPeerId into TConnInfo * Final fixes
This commit is contained in:
@ -35,22 +35,22 @@ use std::{error, fmt, hash::Hash, mem};
|
|||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles a collection of nodes.
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
|
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
|
||||||
/// Object that handles the tasks.
|
/// Object that handles the tasks.
|
||||||
///
|
///
|
||||||
/// The user data contains the state of the task. If `Connected`, then a corresponding entry
|
/// The user data contains the state of the task. If `Connected`, then a corresponding entry
|
||||||
/// must be present in `nodes`.
|
/// must be present in `nodes`.
|
||||||
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TPeerId, TUserData>, TPeerId>,
|
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TaskState<TConnInfo, TUserData>, TConnInfo>,
|
||||||
|
|
||||||
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
|
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
|
||||||
/// must always be in the `Connected` state.
|
/// must always be in the `Connected` state.
|
||||||
nodes: FnvHashMap<TPeerId, TaskId>,
|
nodes: FnvHashMap<TPeerId, TaskId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
|
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for
|
||||||
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TPeerId: fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
f.debug_tuple("CollectionStream").finish()
|
f.debug_tuple("CollectionStream").finish()
|
||||||
@ -59,25 +59,25 @@ where
|
|||||||
|
|
||||||
/// State of a task.
|
/// State of a task.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
enum TaskState<TPeerId, TUserData> {
|
enum TaskState<TConnInfo, TUserData> {
|
||||||
/// Task is attempting to reach a peer.
|
/// Task is attempting to reach a peer.
|
||||||
Pending,
|
Pending,
|
||||||
/// The task is connected to a peer.
|
/// The task is connected to a peer.
|
||||||
Connected(TPeerId, TUserData),
|
Connected(TConnInfo, TUserData),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `CollectionStream`.
|
/// Event that can happen on the `CollectionStream`.
|
||||||
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TUserData, TPeerId> {
|
pub enum CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> {
|
||||||
/// A connection to a node has succeeded. You must use the provided event in order to accept
|
/// A connection to a node has succeeded. You must use the provided event in order to accept
|
||||||
/// the connection.
|
/// the connection.
|
||||||
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>),
|
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>),
|
||||||
|
|
||||||
/// A connection to a node has errored.
|
/// A connection to a node has errored.
|
||||||
///
|
///
|
||||||
/// Can only happen after a node has been successfully reached.
|
/// Can only happen after a node has been successfully reached.
|
||||||
NodeClosed {
|
NodeClosed {
|
||||||
/// Identifier of the node.
|
/// Information about the connection.
|
||||||
peer_id: TPeerId,
|
conn_info: TConnInfo,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: HandledNodeError<THandlerErr>,
|
error: HandledNodeError<THandlerErr>,
|
||||||
/// User data that was passed when accepting.
|
/// User data that was passed when accepting.
|
||||||
@ -97,18 +97,18 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
|
|||||||
/// A node has produced an event.
|
/// A node has produced an event.
|
||||||
NodeEvent {
|
NodeEvent {
|
||||||
/// The node that has generated the event.
|
/// The node that has generated the event.
|
||||||
peer: PeerMut<'a, TInEvent, TUserData, TPeerId>,
|
peer: PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId>,
|
||||||
/// The produced event.
|
/// The produced event.
|
||||||
event: TOutEvent,
|
event: TOutEvent,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for
|
||||||
CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
where TOutEvent: fmt::Debug,
|
where TOutEvent: fmt::Debug,
|
||||||
TReachErr: fmt::Debug,
|
TReachErr: fmt::Debug,
|
||||||
THandlerErr: fmt::Debug,
|
THandlerErr: fmt::Debug,
|
||||||
TPeerId: Eq + Hash + Clone + fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
TUserData: fmt::Debug,
|
TUserData: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
@ -118,9 +118,9 @@ where TOutEvent: fmt::Debug,
|
|||||||
.field(inner)
|
.field(inner)
|
||||||
.finish()
|
.finish()
|
||||||
},
|
},
|
||||||
CollectionEvent::NodeClosed { ref peer_id, ref error, ref user_data } => {
|
CollectionEvent::NodeClosed { ref conn_info, ref error, ref user_data } => {
|
||||||
f.debug_struct("CollectionEvent::NodeClosed")
|
f.debug_struct("CollectionEvent::NodeClosed")
|
||||||
.field("peer_id", peer_id)
|
.field("conn_info", conn_info)
|
||||||
.field("user_data", user_data)
|
.field("user_data", user_data)
|
||||||
.field("error", error)
|
.field("error", error)
|
||||||
.finish()
|
.finish()
|
||||||
@ -133,7 +133,7 @@ where TOutEvent: fmt::Debug,
|
|||||||
},
|
},
|
||||||
CollectionEvent::NodeEvent { ref peer, ref event } => {
|
CollectionEvent::NodeEvent { ref peer, ref event } => {
|
||||||
f.debug_struct("CollectionEvent::NodeEvent")
|
f.debug_struct("CollectionEvent::NodeEvent")
|
||||||
.field("peer_id", peer.id())
|
.field("conn_info", peer.info())
|
||||||
.field("event", event)
|
.field("event", event)
|
||||||
.finish()
|
.finish()
|
||||||
},
|
},
|
||||||
@ -143,24 +143,30 @@ where TOutEvent: fmt::Debug,
|
|||||||
|
|
||||||
/// Event that happens when we reach a node.
|
/// Event that happens when we reach a node.
|
||||||
#[must_use = "The node reached event is used to accept the newly-opened connection"]
|
#[must_use = "The node reached event is used to accept the newly-opened connection"]
|
||||||
pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
|
pub struct CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
|
||||||
/// Peer id we connected to.
|
/// Information about the connection, or `None` if it's been extracted.
|
||||||
peer_id: TPeerId,
|
conn_info: Option<TConnInfo>,
|
||||||
/// The task id that reached the node.
|
/// The task id that reached the node.
|
||||||
id: TaskId,
|
id: TaskId,
|
||||||
/// The `CollectionStream` we are referencing.
|
/// The `CollectionStream` we are referencing.
|
||||||
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>,
|
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
where
|
|
||||||
TPeerId: Eq + Hash + Clone,
|
|
||||||
{
|
{
|
||||||
/// Returns the peer id of the node that has been reached.
|
/// Returns the information of the connection.
|
||||||
#[inline]
|
pub fn connection_info(&self) -> &TConnInfo {
|
||||||
pub fn peer_id(&self) -> &TPeerId {
|
self.conn_info.as_ref().expect("conn_info is always Some when the object is alive; QED")
|
||||||
&self.peer_id
|
}
|
||||||
|
|
||||||
|
/// Returns the identity of the node we connected to.
|
||||||
|
pub fn peer_id(&self) -> &TPeerId
|
||||||
|
where
|
||||||
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
|
{
|
||||||
|
self.connection_info().peer_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the reach attempt that reached the node.
|
/// Returns the reach attempt that reached the node.
|
||||||
@ -168,40 +174,54 @@ where
|
|||||||
pub fn reach_attempt_id(&self) -> ReachAttemptId {
|
pub fn reach_attempt_id(&self) -> ReachAttemptId {
|
||||||
ReachAttemptId(self.id)
|
ReachAttemptId(self.id)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
|
where
|
||||||
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
|
{
|
||||||
/// Returns `true` if accepting this reached node would replace an existing connection to that
|
/// Returns `true` if accepting this reached node would replace an existing connection to that
|
||||||
/// node.
|
/// node.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn would_replace(&self) -> bool {
|
pub fn would_replace(&self) -> bool {
|
||||||
self.parent.nodes.contains_key(&self.peer_id)
|
self.parent.nodes.contains_key(self.connection_info().peer_id())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Accepts the new node.
|
/// Accepts the new node.
|
||||||
pub fn accept(self, user_data: TUserData) -> (CollectionNodeAccept<TUserData>, TPeerId) {
|
pub fn accept(mut self, user_data: TUserData) -> (CollectionNodeAccept<TConnInfo, TUserData>, TConnInfo)
|
||||||
|
where
|
||||||
|
// TODO: these two clones shouldn't be necessary if we return references
|
||||||
|
TConnInfo: Clone,
|
||||||
|
TPeerId: Clone,
|
||||||
|
{
|
||||||
|
let self_conn_info = self.conn_info.take()
|
||||||
|
.expect("conn_info is always Some when the object is alive; QED");
|
||||||
|
|
||||||
// Set the state of the task to `Connected`.
|
// Set the state of the task to `Connected`.
|
||||||
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
|
let former_task_id = self.parent.nodes.insert(self_conn_info.peer_id().clone(), self.id);
|
||||||
*self.parent.inner.task(self.id)
|
*self.parent.inner.task(self.id)
|
||||||
.expect("A CollectionReachEvent is only ever created from a valid attempt; QED")
|
.expect("A CollectionReachEvent is only ever created from a valid attempt; QED")
|
||||||
.user_data_mut() = TaskState::Connected(self.peer_id.clone(), user_data);
|
.user_data_mut() = TaskState::Connected(self_conn_info.clone(), user_data);
|
||||||
|
|
||||||
// It is possible that we already have a task connected to the same peer. In this
|
// It is possible that we already have a task connected to the same peer. In this
|
||||||
// case, we need to emit a `NodeReplaced` event.
|
// case, we need to emit a `NodeReplaced` event.
|
||||||
let tasks = &mut self.parent.inner;
|
let tasks = &mut self.parent.inner;
|
||||||
let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) {
|
let ret_value = if let Some(former_task) = former_task_id.and_then(|i| tasks.task(i)) {
|
||||||
debug_assert!(match *former_task.user_data() {
|
debug_assert!(match *former_task.user_data() {
|
||||||
TaskState::Connected(ref p, _) if *p == self.peer_id => true,
|
TaskState::Connected(ref p, _) if p.peer_id() == self_conn_info.peer_id() => true,
|
||||||
_ => false
|
_ => false
|
||||||
});
|
});
|
||||||
let user_data = match former_task.close().into_user_data() {
|
let (old_info, user_data) = match former_task.close().into_user_data() {
|
||||||
TaskState::Connected(_, user_data) => user_data,
|
TaskState::Connected(old_info, user_data) => (old_info, user_data),
|
||||||
_ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \
|
_ => panic!("The former task was picked from `nodes`; all the nodes in `nodes` \
|
||||||
are always in the connected state")
|
are always in the connected state")
|
||||||
};
|
};
|
||||||
// TODO: we unfortunately have to clone the peer id here
|
(CollectionNodeAccept::ReplacedExisting(old_info, user_data), self_conn_info)
|
||||||
(CollectionNodeAccept::ReplacedExisting(user_data), self.peer_id.clone())
|
|
||||||
} else {
|
} else {
|
||||||
// TODO: we unfortunately have to clone the peer id here
|
(CollectionNodeAccept::NewEntry, self_conn_info)
|
||||||
(CollectionNodeAccept::NewEntry, self.peer_id.clone())
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Don't run the destructor.
|
// Don't run the destructor.
|
||||||
@ -214,29 +234,29 @@ where
|
|||||||
///
|
///
|
||||||
/// Has the same effect as dropping the event without accepting it.
|
/// Has the same effect as dropping the event without accepting it.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn deny(self) -> TPeerId {
|
pub fn deny(mut self) -> TConnInfo {
|
||||||
// TODO: we unfortunately have to clone the id here, in order to be explicit
|
let conn_info = self.conn_info.take()
|
||||||
let peer_id = self.peer_id.clone();
|
.expect("conn_info is always Some when the object is alive; QED");
|
||||||
drop(self); // Just to be explicit
|
drop(self); // Just to be explicit
|
||||||
peer_id
|
conn_info
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> fmt::Debug for
|
||||||
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TPeerId: Eq + Hash + Clone + fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
f.debug_struct("CollectionReachEvent")
|
f.debug_struct("CollectionReachEvent")
|
||||||
.field("peer_id", &self.peer_id)
|
.field("conn_info", &self.conn_info)
|
||||||
.field("reach_attempt_id", &self.reach_attempt_id())
|
.field("reach_attempt_id", &self.reach_attempt_id())
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId> Drop for
|
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId> Drop for
|
||||||
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let task = self.parent.inner.task(self.id)
|
let task = self.parent.inner.task(self.id)
|
||||||
@ -251,9 +271,10 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeer
|
|||||||
|
|
||||||
/// Outcome of accepting a node.
|
/// Outcome of accepting a node.
|
||||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
pub enum CollectionNodeAccept<TUserData> {
|
pub enum CollectionNodeAccept<TConnInfo, TUserData> {
|
||||||
/// We replaced an existing node. Returns the user data that was assigned to this node.
|
/// We replaced an existing node. Returns the information about the old connection and the
|
||||||
ReplacedExisting(TUserData),
|
/// user data that was assigned to this node.
|
||||||
|
ReplacedExisting(TConnInfo, TUserData),
|
||||||
/// We didn't replace anything existing.
|
/// We didn't replace anything existing.
|
||||||
NewEntry,
|
NewEntry,
|
||||||
}
|
}
|
||||||
@ -262,10 +283,28 @@ pub enum CollectionNodeAccept<TUserData> {
|
|||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct ReachAttemptId(TaskId);
|
pub struct ReachAttemptId(TaskId);
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
/// Information about a connection.
|
||||||
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
pub trait ConnectionInfo {
|
||||||
|
/// Identity of the node we are connected to.
|
||||||
|
type PeerId: Eq + Hash;
|
||||||
|
|
||||||
|
/// Returns the identity of the node we are connected to on this connection.
|
||||||
|
fn peer_id(&self) -> &Self::PeerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionInfo for PeerId {
|
||||||
|
type PeerId = PeerId;
|
||||||
|
|
||||||
|
fn peer_id(&self) -> &PeerId {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
|
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TPeerId: Eq + Hash + Clone,
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
{
|
{
|
||||||
/// Creates a new empty collection.
|
/// Creates a new empty collection.
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -283,8 +322,8 @@ where
|
|||||||
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
|
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
|
||||||
-> ReachAttemptId
|
-> ReachAttemptId
|
||||||
where
|
where
|
||||||
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
|
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
|
||||||
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
||||||
TReachErr: error::Error + Send + 'static,
|
TReachErr: error::Error + Send + 'static,
|
||||||
@ -293,7 +332,7 @@ where
|
|||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
TPeerId: Send + 'static,
|
TConnInfo: Send + 'static,
|
||||||
{
|
{
|
||||||
ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler))
|
ReachAttemptId(self.inner.add_reach_attempt(future, TaskState::Pending, handler))
|
||||||
}
|
}
|
||||||
@ -301,7 +340,7 @@ where
|
|||||||
/// Interrupts a reach attempt.
|
/// Interrupts a reach attempt.
|
||||||
///
|
///
|
||||||
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
|
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
|
||||||
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<InterruptedReachAttempt<TInEvent, TPeerId, TUserData>, InterruptError> {
|
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>, InterruptError> {
|
||||||
match self.inner.task(id.0) {
|
match self.inner.task(id.0) {
|
||||||
None => Err(InterruptError::ReachAttemptNotFound),
|
None => Err(InterruptError::ReachAttemptNotFound),
|
||||||
Some(task) => {
|
Some(task) => {
|
||||||
@ -330,7 +369,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Returns `None` if we don't have a connection to this peer.
|
/// Returns `None` if we don't have a connection to this peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<'_, TInEvent, TUserData, TPeerId>> {
|
pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<'_, TInEvent, TUserData, TConnInfo, TPeerId>> {
|
||||||
let task = match self.nodes.get(id) {
|
let task = match self.nodes.get(id) {
|
||||||
Some(&task) => task,
|
Some(&task) => task,
|
||||||
None => return None,
|
None => return None,
|
||||||
@ -366,7 +405,10 @@ where
|
|||||||
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
|
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
|
||||||
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
|
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
|
||||||
/// > borrowed if necessary.
|
/// > borrowed if necessary.
|
||||||
pub fn poll(&mut self) -> Async<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>> {
|
pub fn poll(&mut self) -> Async<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>>
|
||||||
|
where
|
||||||
|
TConnInfo: Clone, // TODO: Clone shouldn't be necessary
|
||||||
|
{
|
||||||
let item = match self.inner.poll() {
|
let item = match self.inner.poll() {
|
||||||
Async::Ready(item) => item,
|
Async::Ready(item) => item,
|
||||||
Async::NotReady => return Async::NotReady,
|
Async::NotReady => return Async::NotReady,
|
||||||
@ -395,12 +437,12 @@ where
|
|||||||
panic!("The HandledNodesTasks is guaranteed to always return the handler \
|
panic!("The HandledNodesTasks is guaranteed to always return the handler \
|
||||||
when producing a TaskClosedEvent::Reach error");
|
when producing a TaskClosedEvent::Reach error");
|
||||||
},
|
},
|
||||||
(TaskState::Connected(peer_id, user_data), TaskClosedEvent::Node(err), _handler) => {
|
(TaskState::Connected(conn_info, user_data), TaskClosedEvent::Node(err), _handler) => {
|
||||||
debug_assert!(_handler.is_none());
|
debug_assert!(_handler.is_none());
|
||||||
let _node_task_id = self.nodes.remove(&peer_id);
|
let _node_task_id = self.nodes.remove(conn_info.peer_id());
|
||||||
debug_assert_eq!(_node_task_id, Some(id));
|
debug_assert_eq!(_node_task_id, Some(id));
|
||||||
Async::Ready(CollectionEvent::NodeClosed {
|
Async::Ready(CollectionEvent::NodeClosed {
|
||||||
peer_id,
|
conn_info,
|
||||||
error: err,
|
error: err,
|
||||||
user_data,
|
user_data,
|
||||||
})
|
})
|
||||||
@ -411,18 +453,18 @@ where
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
HandledNodesEvent::NodeReached { task, peer_id } => {
|
HandledNodesEvent::NodeReached { task, conn_info } => {
|
||||||
let id = task.id();
|
let id = task.id();
|
||||||
drop(task);
|
drop(task);
|
||||||
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
|
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
|
||||||
parent: self,
|
parent: self,
|
||||||
id,
|
id,
|
||||||
peer_id,
|
conn_info: Some(conn_info),
|
||||||
}))
|
}))
|
||||||
},
|
},
|
||||||
HandledNodesEvent::NodeEvent { task, event } => {
|
HandledNodesEvent::NodeEvent { task, event } => {
|
||||||
let peer_id = match task.user_data() {
|
let conn_info = match task.user_data() {
|
||||||
TaskState::Connected(peer_id, _) => peer_id.clone(),
|
TaskState::Connected(conn_info, _) => conn_info.clone(),
|
||||||
_ => panic!("we can only receive NodeEvent events from a task after we \
|
_ => panic!("we can only receive NodeEvent events from a task after we \
|
||||||
received a corresponding NodeReached event from that same task; \
|
received a corresponding NodeReached event from that same task; \
|
||||||
when we receive a NodeReached event, we ensure that the entry in \
|
when we receive a NodeReached event, we ensure that the entry in \
|
||||||
@ -432,7 +474,7 @@ where
|
|||||||
Async::Ready(CollectionEvent::NodeEvent {
|
Async::Ready(CollectionEvent::NodeEvent {
|
||||||
// TODO: normally we'd build a `PeerMut` manually here, but the borrow checker
|
// TODO: normally we'd build a `PeerMut` manually here, but the borrow checker
|
||||||
// doesn't like it
|
// doesn't like it
|
||||||
peer: self.peer_mut(&peer_id)
|
peer: self.peer_mut(&conn_info.peer_id())
|
||||||
.expect("we can only receive NodeEvent events from a task after we \
|
.expect("we can only receive NodeEvent events from a task after we \
|
||||||
received a corresponding NodeReached event from that same task;\
|
received a corresponding NodeReached event from that same task;\
|
||||||
when that happens, peer_mut will always return Some; QED"),
|
when that happens, peer_mut will always return Some; QED"),
|
||||||
@ -470,14 +512,14 @@ impl fmt::Display for InterruptError {
|
|||||||
impl error::Error for InterruptError {}
|
impl error::Error for InterruptError {}
|
||||||
|
|
||||||
/// Reach attempt after it has been interrupted.
|
/// Reach attempt after it has been interrupted.
|
||||||
pub struct InterruptedReachAttempt<TInEvent, TPeerId, TUserData> {
|
pub struct InterruptedReachAttempt<TInEvent, TConnInfo, TUserData> {
|
||||||
inner: ClosedTask<TInEvent, TaskState<TPeerId, TUserData>>,
|
inner: ClosedTask<TInEvent, TaskState<TConnInfo, TUserData>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TInEvent, TPeerId, TUserData> fmt::Debug for InterruptedReachAttempt<TInEvent, TPeerId, TUserData>
|
impl<TInEvent, TConnInfo, TUserData> fmt::Debug for InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>
|
||||||
where
|
where
|
||||||
TUserData: fmt::Debug,
|
TUserData: fmt::Debug,
|
||||||
TPeerId: fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
f.debug_tuple("InterruptedReachAttempt")
|
f.debug_tuple("InterruptedReachAttempt")
|
||||||
@ -487,23 +529,31 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer in the collection.
|
/// Access to a peer in the collection.
|
||||||
pub struct PeerMut<'a, TInEvent, TUserData, TPeerId = PeerId> {
|
pub struct PeerMut<'a, TInEvent, TUserData, TConnInfo = PeerId, TPeerId = PeerId> {
|
||||||
inner: HandledNodesTask<'a, TInEvent, TaskState<TPeerId, TUserData>>,
|
inner: HandledNodesTask<'a, TInEvent, TaskState<TConnInfo, TUserData>>,
|
||||||
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
|
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TInEvent, TUserData, TPeerId> PeerMut<'a, TInEvent, TUserData, TPeerId>
|
impl<'a, TInEvent, TUserData, TConnInfo, TPeerId> PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId> {
|
||||||
where
|
/// Returns the information of the connection with the peer.
|
||||||
TPeerId: Eq + Hash,
|
pub fn info(&self) -> &TConnInfo {
|
||||||
{
|
|
||||||
/// Returns the identifier of the peer.
|
|
||||||
pub fn id(&self) -> &TPeerId {
|
|
||||||
match self.inner.user_data() {
|
match self.inner.user_data() {
|
||||||
TaskState::Connected(peer_id, _) => peer_id,
|
TaskState::Connected(conn_info, _) => conn_info,
|
||||||
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
|
_ => panic!("A PeerMut is only ever constructed from a peer in the connected \
|
||||||
state; QED")
|
state; QED")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, TInEvent, TUserData, TConnInfo, TPeerId> PeerMut<'a, TInEvent, TUserData, TConnInfo, TPeerId>
|
||||||
|
where
|
||||||
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
|
{
|
||||||
|
/// Returns the identity of the peer.
|
||||||
|
pub fn id(&self) -> &TPeerId {
|
||||||
|
self.info().peer_id()
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the user data that was stored in the collections when we accepted the connection.
|
/// Returns the user data that was stored in the collections when we accepted the connection.
|
||||||
pub fn user_data(&self) -> &TUserData {
|
pub fn user_data(&self) -> &TUserData {
|
||||||
@ -534,8 +584,8 @@ where
|
|||||||
/// No further event will be generated for this node.
|
/// No further event will be generated for this node.
|
||||||
pub fn close(self) -> TUserData {
|
pub fn close(self) -> TUserData {
|
||||||
let task_id = self.inner.id();
|
let task_id = self.inner.id();
|
||||||
if let TaskState::Connected(peer_id, user_data) = self.inner.close().into_user_data() {
|
if let TaskState::Connected(conn_info, user_data) = self.inner.close().into_user_data() {
|
||||||
let old_task_id = self.nodes.remove(&peer_id);
|
let old_task_id = self.nodes.remove(conn_info.peer_id());
|
||||||
debug_assert_eq!(old_task_id, Some(task_id));
|
debug_assert_eq!(old_task_id, Some(task_id));
|
||||||
user_data
|
user_data
|
||||||
} else {
|
} else {
|
||||||
@ -551,7 +601,7 @@ where
|
|||||||
/// The reach attempt will only be effectively cancelled once the peer (the object you're
|
/// The reach attempt will only be effectively cancelled once the peer (the object you're
|
||||||
/// manipulating) has received some network activity. However no event will be ever be
|
/// manipulating) has received some network activity. However no event will be ever be
|
||||||
/// generated from this reach attempt, and this takes effect immediately.
|
/// generated from this reach attempt, and this takes effect immediately.
|
||||||
pub fn take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TPeerId, TUserData>) {
|
pub fn take_over(&mut self, id: InterruptedReachAttempt<TInEvent, TConnInfo, TUserData>) {
|
||||||
let _state = self.inner.take_over(id.inner);
|
let _state = self.inner.take_over(id.inner);
|
||||||
debug_assert!(if let TaskState::Pending = _state { true } else { false });
|
debug_assert!(if let TaskState::Pending = _state { true } else { false });
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,7 @@ mod tests;
|
|||||||
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles a collection of nodes.
|
/// Implementation of `Stream` that handles a collection of nodes.
|
||||||
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
|
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> {
|
||||||
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
|
/// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts
|
||||||
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
/// the task. It is possible that we receive messages from tasks that used to be in this list
|
||||||
/// but no longer are, in which case we should ignore them.
|
/// but no longer are, in which case we should ignore them.
|
||||||
@ -73,13 +73,13 @@ pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THand
|
|||||||
local_spawns: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
|
local_spawns: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
|
||||||
|
|
||||||
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)>,
|
||||||
/// Receiver side for the events.
|
/// Receiver side for the events.
|
||||||
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
|
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId> fmt::Debug for
|
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo> fmt::Debug for
|
||||||
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
|
||||||
where
|
where
|
||||||
TUserData: fmt::Debug
|
TUserData: fmt::Debug
|
||||||
{
|
{
|
||||||
@ -126,30 +126,31 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Prototype for a `NodeHandler`.
|
/// Prototype for a `NodeHandler`.
|
||||||
pub trait IntoNodeHandler<TPeerId = PeerId> {
|
pub trait IntoNodeHandler<TConnInfo = PeerId> {
|
||||||
/// The node handler.
|
/// The node handler.
|
||||||
type Handler: NodeHandler;
|
type Handler: NodeHandler;
|
||||||
|
|
||||||
/// Builds the node handler.
|
/// Builds the node handler.
|
||||||
///
|
///
|
||||||
/// The `TPeerId` is the id of the node the handler is going to handle.
|
/// The `TConnInfo` is the information about the connection that the handler is going to handle.
|
||||||
fn into_handler(self, remote_peer_id: &TPeerId) -> Self::Handler;
|
/// This is generated by the `Transport` and typically implements the `ConnectionInfo` trait.
|
||||||
|
fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, TPeerId> IntoNodeHandler<TPeerId> for T
|
impl<T, TConnInfo> IntoNodeHandler<TConnInfo> for T
|
||||||
where T: NodeHandler
|
where T: NodeHandler
|
||||||
{
|
{
|
||||||
type Handler = Self;
|
type Handler = Self;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn into_handler(self, _: &TPeerId) -> Self {
|
fn into_handler(self, _: &TConnInfo) -> Self {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `HandledNodesTasks`.
|
/// Event that can happen on the `HandledNodesTasks`.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId = PeerId> {
|
pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> {
|
||||||
/// A task has been closed.
|
/// A task has been closed.
|
||||||
///
|
///
|
||||||
/// This happens once the node handler closes or an error happens.
|
/// This happens once the node handler closes or an error happens.
|
||||||
@ -169,7 +170,7 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa
|
|||||||
/// The task that succeeded.
|
/// The task that succeeded.
|
||||||
task: Task<'a, TInEvent, TUserData>,
|
task: Task<'a, TInEvent, TUserData>,
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
peer_id: TPeerId,
|
conn_info: TConnInfo,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// A task has produced an event.
|
/// A task has produced an event.
|
||||||
@ -185,8 +186,8 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa
|
|||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct TaskId(usize);
|
pub struct TaskId(usize);
|
||||||
|
|
||||||
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
|
||||||
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>
|
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
|
||||||
{
|
{
|
||||||
/// Creates a new empty collection.
|
/// Creates a new empty collection.
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -209,8 +210,8 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
|||||||
/// events.
|
/// events.
|
||||||
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId
|
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId
|
||||||
where
|
where
|
||||||
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
|
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
|
||||||
TIntoHandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
TIntoHandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
TReachErr: error::Error + Send + 'static,
|
TReachErr: error::Error + Send + 'static,
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -219,7 +220,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
|||||||
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
|
||||||
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
|
||||||
TPeerId: Send + 'static,
|
TConnInfo: Send + 'static,
|
||||||
{
|
{
|
||||||
let task_id = self.next_task_id;
|
let task_id = self.next_task_id;
|
||||||
self.next_task_id.0 += 1;
|
self.next_task_id.0 += 1;
|
||||||
@ -273,7 +274,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Provides an API similar to `Stream`, except that it cannot produce an error.
|
/// Provides an API similar to `Stream`, except that it cannot produce an error.
|
||||||
pub fn poll(&mut self) -> Async<HandledNodesEvent<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeerId>> {
|
pub fn poll(&mut self) -> Async<HandledNodesEvent<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>> {
|
||||||
let (message, task_id) = match self.poll_inner() {
|
let (message, task_id) = match self.poll_inner() {
|
||||||
Async::Ready(r) => r,
|
Async::Ready(r) => r,
|
||||||
Async::NotReady => return Async::NotReady,
|
Async::NotReady => return Async::NotReady,
|
||||||
@ -289,13 +290,13 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
|||||||
event
|
event
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
InToExtMessage::NodeReached(peer_id) => {
|
InToExtMessage::NodeReached(conn_info) => {
|
||||||
HandledNodesEvent::NodeReached {
|
HandledNodesEvent::NodeReached {
|
||||||
task: match self.tasks.entry(task_id) {
|
task: match self.tasks.entry(task_id) {
|
||||||
Entry::Occupied(inner) => Task { inner },
|
Entry::Occupied(inner) => Task { inner },
|
||||||
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
|
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
|
||||||
},
|
},
|
||||||
peer_id
|
conn_info
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
InToExtMessage::TaskClosed(result, handler) => {
|
InToExtMessage::TaskClosed(result, handler) => {
|
||||||
@ -318,7 +319,7 @@ impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TPeer
|
|||||||
/// split `poll()` in two. This method returns an `InToExtMessage` that is guaranteed to come
|
/// split `poll()` in two. This method returns an `InToExtMessage` that is guaranteed to come
|
||||||
/// from an alive task.
|
/// from an alive task.
|
||||||
// TODO: look into merging with `poll()`
|
// TODO: look into merging with `poll()`
|
||||||
fn poll_inner(&mut self) -> Async<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)> {
|
fn poll_inner(&mut self) -> Async<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)> {
|
||||||
for to_spawn in self.to_spawn.drain() {
|
for to_spawn in self.to_spawn.drain() {
|
||||||
// We try to use the default executor, but fall back to polling the task manually if
|
// We try to use the default executor, but fall back to polling the task manually if
|
||||||
// no executor is available. This makes it possible to use the core in environments
|
// no executor is available. This makes it possible to use the core in environments
|
||||||
@ -490,9 +491,9 @@ enum ExtToInMessage<TInEvent> {
|
|||||||
|
|
||||||
/// Message to transmit from a task to the public API.
|
/// Message to transmit from a task to the public API.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {
|
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo> {
|
||||||
/// A connection to a node has succeeded.
|
/// A connection to a node has succeeded.
|
||||||
NodeReached(TPeerId),
|
NodeReached(TConnInfo),
|
||||||
/// The task closed.
|
/// The task closed.
|
||||||
TaskClosed(TaskClosedEvent<TReachErr, THandlerErr>, Option<TIntoHandler>),
|
TaskClosed(TaskClosedEvent<TReachErr, THandlerErr>, Option<TIntoHandler>),
|
||||||
/// An event from the node.
|
/// An event from the node.
|
||||||
@ -501,28 +502,28 @@ enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {
|
|||||||
|
|
||||||
/// Implementation of `Future` that handles a single node, and all the communications between
|
/// Implementation of `Future` that handles a single node, and all the communications between
|
||||||
/// the various components of the `HandledNodesTasks`.
|
/// the various components of the `HandledNodesTasks`.
|
||||||
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
|
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TIntoHandler: IntoNodeHandler<TPeerId>,
|
TIntoHandler: IntoNodeHandler<TConnInfo>,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
||||||
{
|
{
|
||||||
/// Sender to transmit events to the outside.
|
/// Sender to transmit events to the outside.
|
||||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TPeerId>, TaskId)>,
|
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TConnInfo>, TaskId)>,
|
||||||
/// Receiving end for events sent from the main `HandledNodesTasks`.
|
/// Receiving end for events sent from the main `HandledNodesTasks`.
|
||||||
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<ExtToInMessage<TInEvent>>>,
|
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<ExtToInMessage<TInEvent>>>,
|
||||||
/// Inner state of the `NodeTask`.
|
/// Inner state of the `NodeTask`.
|
||||||
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>,
|
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TConnInfo>,
|
||||||
/// Identifier of the attempt.
|
/// Identifier of the attempt.
|
||||||
id: TaskId,
|
id: TaskId,
|
||||||
/// Channels to keep alive for as long as we don't have an acknowledgment from the remote.
|
/// Channels to keep alive for as long as we don't have an acknowledgment from the remote.
|
||||||
taken_over: SmallVec<[mpsc::UnboundedSender<ExtToInMessage<TInEvent>>; 1]>,
|
taken_over: SmallVec<[mpsc::UnboundedSender<ExtToInMessage<TInEvent>>; 1]>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>
|
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TConnInfo>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TIntoHandler: IntoNodeHandler<TPeerId>,
|
TIntoHandler: IntoNodeHandler<TConnInfo>,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
|
||||||
{
|
{
|
||||||
/// Future to resolve to connect to the node.
|
/// Future to resolve to connect to the node.
|
||||||
@ -547,12 +548,12 @@ where
|
|||||||
Poisoned,
|
Poisoned,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId> Future for
|
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo> Future for
|
||||||
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
|
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo>
|
||||||
where
|
where
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr>,
|
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr>,
|
||||||
TIntoHandler: IntoNodeHandler<TPeerId>,
|
TIntoHandler: IntoNodeHandler<TConnInfo>,
|
||||||
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Item = ();
|
||||||
@ -579,9 +580,9 @@ where
|
|||||||
}
|
}
|
||||||
// Check whether dialing succeeded.
|
// Check whether dialing succeeded.
|
||||||
match future.poll() {
|
match future.poll() {
|
||||||
Ok(Async::Ready((peer_id, muxer))) => {
|
Ok(Async::Ready((conn_info, muxer))) => {
|
||||||
let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id));
|
let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info));
|
||||||
let event = InToExtMessage::NodeReached(peer_id);
|
let event = InToExtMessage::NodeReached(conn_info);
|
||||||
for event in events_buffer {
|
for event in events_buffer {
|
||||||
node.inject_event(event);
|
node.inject_event(event);
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ pub mod listeners;
|
|||||||
pub mod node;
|
pub mod node;
|
||||||
pub mod raw_swarm;
|
pub mod raw_swarm;
|
||||||
|
|
||||||
|
pub use self::collection::ConnectionInfo;
|
||||||
pub use self::node::Substream;
|
pub use self::node::Substream;
|
||||||
pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint};
|
pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint};
|
||||||
pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent};
|
pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent};
|
||||||
|
@ -27,6 +27,7 @@ use crate::{
|
|||||||
CollectionNodeAccept,
|
CollectionNodeAccept,
|
||||||
CollectionReachEvent,
|
CollectionReachEvent,
|
||||||
CollectionStream,
|
CollectionStream,
|
||||||
|
ConnectionInfo,
|
||||||
ReachAttemptId
|
ReachAttemptId
|
||||||
},
|
},
|
||||||
handled_node::{
|
handled_node::{
|
||||||
@ -53,7 +54,7 @@ use std::{
|
|||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
/// Implementation of `Stream` that handles the nodes.
|
/// Implementation of `Stream` that handles the nodes.
|
||||||
pub struct RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId = PeerId>
|
pub struct RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
@ -61,7 +62,7 @@ where
|
|||||||
listeners: ListenersStream<TTrans>,
|
listeners: ListenersStream<TTrans>,
|
||||||
|
|
||||||
/// The nodes currently active.
|
/// The nodes currently active.
|
||||||
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
|
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TConnInfo>, THandlerErr, (), TConnInfo, TPeerId>,
|
||||||
|
|
||||||
/// The reach attempts of the swarm.
|
/// The reach attempts of the swarm.
|
||||||
/// This needs to be a separate struct in order to handle multiple mutable borrows issues.
|
/// This needs to be a separate struct in order to handle multiple mutable borrows issues.
|
||||||
@ -71,10 +72,11 @@ where
|
|||||||
incoming_limit: Option<u32>,
|
incoming_limit: Option<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
impl<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for
|
||||||
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport + fmt::Debug,
|
TTrans: fmt::Debug + Transport,
|
||||||
|
TConnInfo: fmt::Debug,
|
||||||
TPeerId: fmt::Debug + Eq + Hash,
|
TPeerId: fmt::Debug + Eq + Hash,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
@ -130,7 +132,7 @@ struct OutReachAttempt {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Event that can happen on the `RawSwarm`.
|
/// Event that can happen on the `RawSwarm`.
|
||||||
pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a = PeerId>
|
pub enum RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
@ -145,7 +147,7 @@ where
|
|||||||
},
|
},
|
||||||
|
|
||||||
/// A new connection arrived on a listener.
|
/// A new connection arrived on a listener.
|
||||||
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
|
||||||
|
|
||||||
/// A new connection was arriving on a listener, but an error happened when negotiating it.
|
/// A new connection was arriving on a listener, but an error happened when negotiating it.
|
||||||
///
|
///
|
||||||
@ -162,8 +164,8 @@ where
|
|||||||
|
|
||||||
/// A new connection to a peer has been opened.
|
/// A new connection to a peer has been opened.
|
||||||
Connected {
|
Connected {
|
||||||
/// Id of the peer.
|
/// Information about the connection, including the peer ID.
|
||||||
peer_id: TPeerId,
|
conn_info: TConnInfo,
|
||||||
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
||||||
/// we opened.
|
/// we opened.
|
||||||
endpoint: ConnectedPoint,
|
endpoint: ConnectedPoint,
|
||||||
@ -171,8 +173,12 @@ where
|
|||||||
|
|
||||||
/// A connection to a peer has been replaced with a new one.
|
/// A connection to a peer has been replaced with a new one.
|
||||||
Replaced {
|
Replaced {
|
||||||
/// Id of the peer.
|
/// Information about the new connection. The `TPeerId` is the same as the one as the one
|
||||||
peer_id: TPeerId,
|
/// in `old_info`.
|
||||||
|
new_info: TConnInfo,
|
||||||
|
/// Information about the old connection. The `TPeerId` is the same as the one as the one
|
||||||
|
/// in `new_info`.
|
||||||
|
old_info: TConnInfo,
|
||||||
/// Endpoint we were connected to.
|
/// Endpoint we were connected to.
|
||||||
closed_endpoint: ConnectedPoint,
|
closed_endpoint: ConnectedPoint,
|
||||||
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
/// If `Listener`, then we received the connection. If `Dial`, then it's a connection that
|
||||||
@ -182,8 +188,8 @@ where
|
|||||||
|
|
||||||
/// The handler of a node has produced an error.
|
/// The handler of a node has produced an error.
|
||||||
NodeClosed {
|
NodeClosed {
|
||||||
/// Identifier of the node.
|
/// Information about the connection that has been closed.
|
||||||
peer_id: TPeerId,
|
conn_info: TConnInfo,
|
||||||
/// Endpoint we were connected to.
|
/// Endpoint we were connected to.
|
||||||
endpoint: ConnectedPoint,
|
endpoint: ConnectedPoint,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
@ -202,7 +208,7 @@ where
|
|||||||
multiaddr: Multiaddr,
|
multiaddr: Multiaddr,
|
||||||
|
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: RawSwarmReachError<TTrans::Error, TPeerId>,
|
error: RawSwarmReachError<TTrans::Error, TConnInfo>,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Failed to reach a peer that we were trying to dial.
|
/// Failed to reach a peer that we were trying to dial.
|
||||||
@ -219,20 +225,21 @@ where
|
|||||||
|
|
||||||
/// A node produced a custom event.
|
/// A node produced a custom event.
|
||||||
NodeEvent {
|
NodeEvent {
|
||||||
/// Id of the node that produced the event.
|
/// Connection that produced the event.
|
||||||
peer_id: TPeerId,
|
conn_info: TConnInfo,
|
||||||
/// Event that was produced by the node.
|
/// Event that was produced by the node.
|
||||||
event: TOutEvent,
|
event: TOutEvent,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for
|
||||||
RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TOutEvent: fmt::Debug,
|
TOutEvent: fmt::Debug,
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TTrans::Error: fmt::Debug,
|
TTrans::Error: fmt::Debug,
|
||||||
THandlerErr: fmt::Debug,
|
THandlerErr: fmt::Debug,
|
||||||
|
TConnInfo: fmt::Debug,
|
||||||
TPeerId: fmt::Debug,
|
TPeerId: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
@ -256,22 +263,23 @@ where
|
|||||||
.field("error", error)
|
.field("error", error)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
RawSwarmEvent::Connected { ref peer_id, ref endpoint } => {
|
RawSwarmEvent::Connected { ref conn_info, ref endpoint } => {
|
||||||
f.debug_struct("Connected")
|
f.debug_struct("Connected")
|
||||||
.field("peer_id", peer_id)
|
.field("conn_info", conn_info)
|
||||||
.field("endpoint", endpoint)
|
.field("endpoint", endpoint)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
RawSwarmEvent::Replaced { ref peer_id, ref closed_endpoint, ref endpoint } => {
|
RawSwarmEvent::Replaced { ref new_info, ref old_info, ref closed_endpoint, ref endpoint } => {
|
||||||
f.debug_struct("Replaced")
|
f.debug_struct("Replaced")
|
||||||
.field("peer_id", peer_id)
|
.field("new_info", new_info)
|
||||||
|
.field("old_info", old_info)
|
||||||
.field("closed_endpoint", closed_endpoint)
|
.field("closed_endpoint", closed_endpoint)
|
||||||
.field("endpoint", endpoint)
|
.field("endpoint", endpoint)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint, ref error } => {
|
RawSwarmEvent::NodeClosed { ref conn_info, ref endpoint, ref error } => {
|
||||||
f.debug_struct("NodeClosed")
|
f.debug_struct("NodeClosed")
|
||||||
.field("peer_id", peer_id)
|
.field("conn_info", conn_info)
|
||||||
.field("endpoint", endpoint)
|
.field("endpoint", endpoint)
|
||||||
.field("error", error)
|
.field("error", error)
|
||||||
.finish()
|
.finish()
|
||||||
@ -290,9 +298,9 @@ where
|
|||||||
.field("error", error)
|
.field("error", error)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
RawSwarmEvent::NodeEvent { ref peer_id, ref event } => {
|
RawSwarmEvent::NodeEvent { ref conn_info, ref event } => {
|
||||||
f.debug_struct("NodeEvent")
|
f.debug_struct("NodeEvent")
|
||||||
.field("peer_id", peer_id)
|
.field("conn_info", conn_info)
|
||||||
.field("event", event)
|
.field("event", event)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
@ -302,23 +310,23 @@ where
|
|||||||
|
|
||||||
/// Internal error type that contains all the possible errors that can happen in a reach attempt.
|
/// Internal error type that contains all the possible errors that can happen in a reach attempt.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum InternalReachErr<TTransErr, TPeerId> {
|
enum InternalReachErr<TTransErr, TConnInfo> {
|
||||||
/// Error in the transport layer.
|
/// Error in the transport layer.
|
||||||
Transport(TransportError<TTransErr>),
|
Transport(TransportError<TTransErr>),
|
||||||
/// We successfully reached the peer, but there was a mismatch between the expected id and the
|
/// We successfully reached the peer, but there was a mismatch between the expected id and the
|
||||||
/// actual id of the peer.
|
/// actual id of the peer.
|
||||||
PeerIdMismatch {
|
PeerIdMismatch {
|
||||||
/// The peer id that the node reports.
|
/// The information about the bad connection.
|
||||||
obtained: TPeerId,
|
obtained: TConnInfo,
|
||||||
},
|
},
|
||||||
/// The negotiated `PeerId` is the same as the one of the local node.
|
/// The negotiated `PeerId` is the same as the one of the local node.
|
||||||
FoundLocalPeerId,
|
FoundLocalPeerId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr, TPeerId> fmt::Display for InternalReachErr<TTransErr, TPeerId>
|
impl<TTransErr, TConnInfo> fmt::Display for InternalReachErr<TTransErr, TConnInfo>
|
||||||
where
|
where
|
||||||
TTransErr: fmt::Display,
|
TTransErr: fmt::Display,
|
||||||
TPeerId: fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
@ -333,10 +341,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr, TPeerId> error::Error for InternalReachErr<TTransErr, TPeerId>
|
impl<TTransErr, TConnInfo> error::Error for InternalReachErr<TTransErr, TConnInfo>
|
||||||
where
|
where
|
||||||
TTransErr: error::Error + 'static,
|
TTransErr: error::Error + 'static,
|
||||||
TPeerId: fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
@ -363,22 +371,22 @@ pub enum PeerState {
|
|||||||
|
|
||||||
/// Error that can happen when trying to reach a node.
|
/// Error that can happen when trying to reach a node.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum RawSwarmReachError<TTransErr, TPeerId> {
|
pub enum RawSwarmReachError<TTransErr, TConnInfo> {
|
||||||
/// Error in the transport layer.
|
/// Error in the transport layer.
|
||||||
Transport(TransportError<TTransErr>),
|
Transport(TransportError<TTransErr>),
|
||||||
|
|
||||||
/// We successfully reached the peer, but there was a mismatch between the expected id and the
|
/// We successfully reached the peer, but there was a mismatch between the expected id and the
|
||||||
/// actual id of the peer.
|
/// actual id of the peer.
|
||||||
PeerIdMismatch {
|
PeerIdMismatch {
|
||||||
/// The peer id that the node reports.
|
/// The information about the other connection.
|
||||||
obtained: TPeerId,
|
obtained: TConnInfo,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr, TPeerId> fmt::Display for RawSwarmReachError<TTransErr, TPeerId>
|
impl<TTransErr, TConnInfo> fmt::Display for RawSwarmReachError<TTransErr, TConnInfo>
|
||||||
where
|
where
|
||||||
TTransErr: fmt::Display,
|
TTransErr: fmt::Display,
|
||||||
TPeerId: fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
@ -390,10 +398,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTransErr, TPeerId> error::Error for RawSwarmReachError<TTransErr, TPeerId>
|
impl<TTransErr, TConnInfo> error::Error for RawSwarmReachError<TTransErr, TConnInfo>
|
||||||
where
|
where
|
||||||
TTransErr: error::Error + 'static,
|
TTransErr: error::Error + 'static,
|
||||||
TPeerId: fmt::Debug,
|
TConnInfo: fmt::Debug,
|
||||||
{
|
{
|
||||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
match self {
|
match self {
|
||||||
@ -478,7 +486,7 @@ where TTransErr: error::Error + 'static
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// A new connection arrived on a listener.
|
/// A new connection arrived on a listener.
|
||||||
pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where TTrans: Transport
|
where TTrans: Transport
|
||||||
{
|
{
|
||||||
/// The produced upgrade.
|
/// The produced upgrade.
|
||||||
@ -490,18 +498,18 @@ where TTrans: Transport
|
|||||||
/// Address used to send back data to the remote.
|
/// Address used to send back data to the remote.
|
||||||
send_back_addr: Multiaddr,
|
send_back_addr: Multiaddr,
|
||||||
/// Reference to the `active_nodes` field of the swarm.
|
/// Reference to the `active_nodes` field of the swarm.
|
||||||
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
|
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TConnInfo>, THandlerErr, (), TConnInfo, TPeerId>,
|
||||||
/// Reference to the `other_reach_attempts` field of the swarm.
|
/// Reference to the `other_reach_attempts` field of the swarm.
|
||||||
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
|
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::ListenerUpgrade: Send + 'static,
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -510,7 +518,8 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + Send + 'static,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
/// Starts processing the incoming connection and sets the handler to use for it.
|
/// Starts processing the incoming connection and sets the handler to use for it.
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -528,7 +537,7 @@ where
|
|||||||
let upgrade = self.upgrade
|
let upgrade = self.upgrade
|
||||||
.map_err(|err| InternalReachErr::Transport(TransportError::Other(err)))
|
.map_err(|err| InternalReachErr::Transport(TransportError::Other(err)))
|
||||||
.and_then(move |(peer_id, muxer)| {
|
.and_then(move |(peer_id, muxer)| {
|
||||||
if peer_id == local_peer_id {
|
if *peer_id.peer_id() == local_peer_id {
|
||||||
Err(InternalReachErr::FoundLocalPeerId)
|
Err(InternalReachErr::FoundLocalPeerId)
|
||||||
} else {
|
} else {
|
||||||
Ok((peer_id, muxer))
|
Ok((peer_id, muxer))
|
||||||
@ -542,8 +551,8 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where TTrans: Transport
|
where TTrans: Transport
|
||||||
{
|
{
|
||||||
/// Returns the `IncomingInfo` corresponding to this incoming connection.
|
/// Returns the `IncomingInfo` corresponding to this incoming connection.
|
||||||
@ -655,16 +664,17 @@ impl<'a> IncomingInfo<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId>
|
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport + Clone,
|
TTrans: Transport + Clone,
|
||||||
TMuxer: StreamMuxer,
|
TMuxer: StreamMuxer,
|
||||||
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + Clone,
|
||||||
{
|
{
|
||||||
/// Creates a new node events stream.
|
/// Creates a new node events stream.
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -757,7 +767,7 @@ where
|
|||||||
/// The second parameter is the handler to use if we manage to reach a node.
|
/// The second parameter is the handler to use if we manage to reach a node.
|
||||||
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError<TTrans::Error>>
|
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError<TTrans::Error>>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
@ -765,12 +775,14 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
|
TConnInfo: Send + 'static,
|
||||||
|
TPeerId: Send + 'static,
|
||||||
{
|
{
|
||||||
let local_peer_id = self.reach_attempts.local_peer_id.clone();
|
let local_peer_id = self.reach_attempts.local_peer_id.clone();
|
||||||
let future = self.transport().clone().dial(addr.clone())?
|
let future = self.transport().clone().dial(addr.clone())?
|
||||||
.map_err(|err| InternalReachErr::Transport(TransportError::Other(err)))
|
.map_err(|err| InternalReachErr::Transport(TransportError::Other(err)))
|
||||||
.and_then(move |(peer_id, muxer)| {
|
.and_then(move |(peer_id, muxer)| {
|
||||||
if peer_id == local_peer_id {
|
if *peer_id.peer_id() == local_peer_id {
|
||||||
Err(InternalReachErr::FoundLocalPeerId)
|
Err(InternalReachErr::FoundLocalPeerId)
|
||||||
} else {
|
} else {
|
||||||
Ok((peer_id, muxer))
|
Ok((peer_id, muxer))
|
||||||
@ -843,7 +855,7 @@ where
|
|||||||
self.reach_attempts
|
self.reach_attempts
|
||||||
.out_reach_attempts
|
.out_reach_attempts
|
||||||
.keys()
|
.keys()
|
||||||
.filter(move |p| !self.active_nodes.has_connection(&p))
|
.filter(move |p| !self.active_nodes.has_connection(p))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the list of addresses we're currently dialing without knowing the `PeerId` of.
|
/// Returns the list of addresses we're currently dialing without knowing the `PeerId` of.
|
||||||
@ -861,7 +873,7 @@ where
|
|||||||
|
|
||||||
/// Grants access to a struct that represents a peer.
|
/// Grants access to a struct that represents a peer.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn peer(&mut self, peer_id: TPeerId) -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> {
|
pub fn peer(&mut self, peer_id: TPeerId) -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> {
|
||||||
if peer_id == self.reach_attempts.local_peer_id {
|
if peer_id == self.reach_attempts.local_peer_id {
|
||||||
return Peer::LocalNode;
|
return Peer::LocalNode;
|
||||||
}
|
}
|
||||||
@ -904,7 +916,7 @@ where
|
|||||||
/// given peer.
|
/// given peer.
|
||||||
fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
@ -912,17 +924,19 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
|
TConnInfo: Send + 'static,
|
||||||
|
TPeerId: Send + 'static,
|
||||||
{
|
{
|
||||||
let reach_id = match self.transport().clone().dial(first.clone()) {
|
let reach_id = match self.transport().clone().dial(first.clone()) {
|
||||||
Ok(fut) => {
|
Ok(fut) => {
|
||||||
let expected_peer_id = peer_id.clone();
|
let expected_peer_id = peer_id.clone();
|
||||||
let fut = fut
|
let fut = fut
|
||||||
.map_err(|err| InternalReachErr::Transport(TransportError::Other(err)))
|
.map_err(|err| InternalReachErr::Transport(TransportError::Other(err)))
|
||||||
.and_then(move |(actual_peer_id, muxer)| {
|
.and_then(move |(actual_conn_info, muxer)| {
|
||||||
if actual_peer_id == expected_peer_id {
|
if *actual_conn_info.peer_id() == expected_peer_id {
|
||||||
Ok((actual_peer_id, muxer))
|
Ok((actual_conn_info, muxer))
|
||||||
} else {
|
} else {
|
||||||
Err(InternalReachErr::PeerIdMismatch { obtained: actual_peer_id })
|
Err(InternalReachErr::PeerIdMismatch { obtained: actual_conn_info })
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
self.active_nodes.add_reach_attempt(fut, handler)
|
self.active_nodes.add_reach_attempt(fut, handler)
|
||||||
@ -946,9 +960,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Provides an API similar to `Stream`, except that it cannot error.
|
/// Provides an API similar to `Stream`, except that it cannot error.
|
||||||
pub fn poll(&mut self) -> Async<RawSwarmEvent<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>>
|
pub fn poll(&mut self) -> Async<RawSwarmEvent<'_, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)>,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TTrans::ListenerUpgrade: Send + 'static,
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
@ -957,10 +971,12 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
|
TConnInfo: Clone,
|
||||||
|
TPeerId: AsRef<[u8]> + Send + 'static,
|
||||||
{
|
{
|
||||||
// Start by polling the listeners for events, but only
|
// Start by polling the listeners for events, but only
|
||||||
// if numer of incoming connection does not exceed the limit.
|
// if numer of incoming connection does not exceed the limit.
|
||||||
@ -1013,11 +1029,11 @@ where
|
|||||||
out_event = e;
|
out_event = e;
|
||||||
}
|
}
|
||||||
Async::Ready(CollectionEvent::NodeClosed {
|
Async::Ready(CollectionEvent::NodeClosed {
|
||||||
peer_id,
|
conn_info,
|
||||||
error,
|
error,
|
||||||
..
|
..
|
||||||
}) => {
|
}) => {
|
||||||
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
let endpoint = self.reach_attempts.connected_points.remove(conn_info.peer_id())
|
||||||
.expect("We insert into connected_points whenever a connection is \
|
.expect("We insert into connected_points whenever a connection is \
|
||||||
opened and remove only when a connection is closed; the \
|
opened and remove only when a connection is closed; the \
|
||||||
underlying API is guaranteed to always deliver a connection \
|
underlying API is guaranteed to always deliver a connection \
|
||||||
@ -1025,14 +1041,14 @@ where
|
|||||||
messages; QED");
|
messages; QED");
|
||||||
action = Default::default();
|
action = Default::default();
|
||||||
out_event = RawSwarmEvent::NodeClosed {
|
out_event = RawSwarmEvent::NodeClosed {
|
||||||
peer_id,
|
conn_info,
|
||||||
endpoint,
|
endpoint,
|
||||||
error,
|
error,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
Async::Ready(CollectionEvent::NodeEvent { peer, event }) => {
|
Async::Ready(CollectionEvent::NodeEvent { peer, event }) => {
|
||||||
action = Default::default();
|
action = Default::default();
|
||||||
out_event = RawSwarmEvent::NodeEvent { peer_id: peer.id().clone(), event };
|
out_event = RawSwarmEvent::NodeEvent { conn_info: peer.info().clone(), event };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1082,18 +1098,19 @@ impl<THandler, TPeerId> Default for ActionItem<THandler, TPeerId> {
|
|||||||
///
|
///
|
||||||
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
||||||
/// > panics will likely happen.
|
/// > panics will likely happen.
|
||||||
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>(
|
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>(
|
||||||
reach_attempts: &mut ReachAttempts<TPeerId>,
|
reach_attempts: &mut ReachAttempts<TPeerId>,
|
||||||
event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
|
event: CollectionReachEvent<'_, TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TConnInfo>, THandlerErr, (), TConnInfo, TPeerId>,
|
||||||
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>)
|
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>)
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + AsRef<[u8]> + Clone,
|
||||||
{
|
{
|
||||||
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
||||||
// it also makes the logic easier.
|
// it also makes the logic easier.
|
||||||
@ -1144,20 +1161,24 @@ where
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (outcome, peer_id) = event.accept(());
|
let (outcome, conn_info) = event.accept(());
|
||||||
if let CollectionNodeAccept::ReplacedExisting(()) = outcome {
|
if let CollectionNodeAccept::ReplacedExisting(old_info, ()) = outcome {
|
||||||
let closed_endpoint = closed_endpoint
|
let closed_endpoint = closed_endpoint
|
||||||
.expect("We insert into connected_points whenever a connection is opened and \
|
.expect("We insert into connected_points whenever a connection is opened and \
|
||||||
remove only when a connection is closed; the underlying API is \
|
remove only when a connection is closed; the underlying API is \
|
||||||
guaranteed to always deliver a connection closed message after it has \
|
guaranteed to always deliver a connection closed message after it has \
|
||||||
been opened, and no two closed messages; QED");
|
been opened, and no two closed messages; QED");
|
||||||
return (action, RawSwarmEvent::Replaced {
|
return (action, RawSwarmEvent::Replaced {
|
||||||
peer_id,
|
new_info: conn_info,
|
||||||
|
old_info,
|
||||||
endpoint: opened_endpoint,
|
endpoint: opened_endpoint,
|
||||||
closed_endpoint,
|
closed_endpoint,
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
return (action, RawSwarmEvent::Connected { peer_id, endpoint: opened_endpoint });
|
return (action, RawSwarmEvent::Connected {
|
||||||
|
conn_info,
|
||||||
|
endpoint: opened_endpoint
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1182,20 +1203,25 @@ where
|
|||||||
let closed_endpoint = reach_attempts.connected_points
|
let closed_endpoint = reach_attempts.connected_points
|
||||||
.insert(event.peer_id().clone(), opened_endpoint.clone());
|
.insert(event.peer_id().clone(), opened_endpoint.clone());
|
||||||
|
|
||||||
let (outcome, peer_id) = event.accept(());
|
let (outcome, conn_info) = event.accept(());
|
||||||
if let CollectionNodeAccept::ReplacedExisting(()) = outcome {
|
if let CollectionNodeAccept::ReplacedExisting(old_info, ()) = outcome {
|
||||||
let closed_endpoint = closed_endpoint
|
let closed_endpoint = closed_endpoint
|
||||||
.expect("We insert into connected_points whenever a connection is opened and \
|
.expect("We insert into connected_points whenever a connection is opened and \
|
||||||
remove only when a connection is closed; the underlying API is guaranteed \
|
remove only when a connection is closed; the underlying API is guaranteed \
|
||||||
to always deliver a connection closed message after it has been opened, \
|
to always deliver a connection closed message after it has been opened, \
|
||||||
and no two closed messages; QED");
|
and no two closed messages; QED");
|
||||||
return (Default::default(), RawSwarmEvent::Replaced {
|
return (Default::default(), RawSwarmEvent::Replaced {
|
||||||
peer_id,
|
new_info: conn_info,
|
||||||
|
old_info,
|
||||||
endpoint: opened_endpoint,
|
endpoint: opened_endpoint,
|
||||||
closed_endpoint,
|
closed_endpoint,
|
||||||
});
|
});
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return (Default::default(), RawSwarmEvent::Connected { peer_id, endpoint: opened_endpoint });
|
return (Default::default(), RawSwarmEvent::Connected {
|
||||||
|
conn_info,
|
||||||
|
endpoint: opened_endpoint
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1226,14 +1252,15 @@ where
|
|||||||
///
|
///
|
||||||
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
||||||
/// > panics will likely happen.
|
/// > panics will likely happen.
|
||||||
fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>(
|
fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>(
|
||||||
reach_attempts: &mut ReachAttempts<TPeerId>,
|
reach_attempts: &mut ReachAttempts<TPeerId>,
|
||||||
reach_id: ReachAttemptId,
|
reach_id: ReachAttemptId,
|
||||||
error: InternalReachErr<TTrans::Error, TPeerId>,
|
error: InternalReachErr<TTrans::Error, TConnInfo>,
|
||||||
handler: THandler,
|
handler: THandler,
|
||||||
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>)
|
) -> (ActionItem<THandler, TPeerId>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>)
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
|
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
TPeerId: Eq + Hash + Clone,
|
TPeerId: Eq + Hash + Clone,
|
||||||
{
|
{
|
||||||
// Search for the attempt in `out_reach_attempts`.
|
// Search for the attempt in `out_reach_attempts`.
|
||||||
@ -1343,31 +1370,32 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// State of a peer in the system.
|
/// State of a peer in the system.
|
||||||
pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
/// We are connected to this peer.
|
/// We are connected to this peer.
|
||||||
Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
|
||||||
|
|
||||||
/// We are currently attempting to connect to this peer.
|
/// We are currently attempting to connect to this peer.
|
||||||
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
|
||||||
|
|
||||||
/// We are not connected to this peer at all.
|
/// We are not connected to this peer at all.
|
||||||
///
|
///
|
||||||
/// > **Note**: It is however possible that a pending incoming connection is being negotiated
|
/// > **Note**: It is however possible that a pending incoming connection is being negotiated
|
||||||
/// > and will connect to this peer, but we don't know it yet.
|
/// > and will connect to this peer, but we don't know it yet.
|
||||||
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
|
||||||
|
|
||||||
/// The requested peer is the local node.
|
/// The requested peer is the local node.
|
||||||
LocalNode,
|
LocalNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for
|
||||||
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TPeerId: Eq + Hash + fmt::Debug,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: fmt::Debug + Eq + Hash,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
|
||||||
match *self {
|
match *self {
|
||||||
@ -1396,10 +1424,10 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
|
// TODO: add other similar methods that wrap to the ones of `PeerNotConnected`
|
||||||
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
@ -1407,15 +1435,16 @@ where
|
|||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
TInEvent: Send + 'static,
|
TInEvent: Send + 'static,
|
||||||
TOutEvent: Send + 'static,
|
TOutEvent: Send + 'static,
|
||||||
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
/// If we are connected, returns the `PeerConnected`.
|
/// If we are connected, returns the `PeerConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn into_connected(self) -> Option<PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
pub fn into_connected(self) -> Option<PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::Connected(peer) => Some(peer),
|
Peer::Connected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1424,7 +1453,7 @@ where
|
|||||||
|
|
||||||
/// If a connection is pending, returns the `PeerPendingConnect`.
|
/// If a connection is pending, returns the `PeerPendingConnect`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn into_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
pub fn into_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::PendingConnect(peer) => Some(peer),
|
Peer::PendingConnect(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1433,7 +1462,7 @@ where
|
|||||||
|
|
||||||
/// If we are not connected, returns the `PeerNotConnected`.
|
/// If we are not connected, returns the `PeerNotConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn into_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
pub fn into_not_connected(self) -> Option<PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
Peer::NotConnected(peer) => Some(peer),
|
Peer::NotConnected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1448,7 +1477,7 @@ where
|
|||||||
/// Returns an error if we are `LocalNode`.
|
/// Returns an error if we are `LocalNode`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn or_connect(self, addr: Multiaddr, handler: THandler)
|
pub fn or_connect(self, addr: Multiaddr, handler: THandler)
|
||||||
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>, Self>
|
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>, Self>
|
||||||
{
|
{
|
||||||
self.or_connect_with(move |_| addr, handler)
|
self.or_connect_with(move |_| addr, handler)
|
||||||
}
|
}
|
||||||
@ -1462,7 +1491,7 @@ where
|
|||||||
/// Returns an error if we are `LocalNode`.
|
/// Returns an error if we are `LocalNode`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn or_connect_with<TFn>(self, addr: TFn, handler: THandler)
|
pub fn or_connect_with<TFn>(self, addr: TFn, handler: THandler)
|
||||||
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>, Self>
|
-> Result<PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>, Self>
|
||||||
where
|
where
|
||||||
TFn: FnOnce(&TPeerId) -> Multiaddr,
|
TFn: FnOnce(&TPeerId) -> Multiaddr,
|
||||||
{
|
{
|
||||||
@ -1479,22 +1508,23 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Peer we are potentially going to connect to.
|
/// Peer we are potentially going to connect to.
|
||||||
pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a>
|
pub enum PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport
|
TTrans: Transport
|
||||||
{
|
{
|
||||||
/// We are connected to this peer.
|
/// We are connected to this peer.
|
||||||
Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
Connected(PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
|
||||||
|
|
||||||
/// We are currently attempting to connect to this peer.
|
/// We are currently attempting to connect to this peer.
|
||||||
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>),
|
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TPeerId: Eq + Hash + Clone,
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
{
|
{
|
||||||
/// Closes the connection or the connection attempt.
|
/// Closes the connection or the connection attempt.
|
||||||
// TODO: consider returning a `PeerNotConnected`
|
// TODO: consider returning a `PeerNotConnected`
|
||||||
@ -1508,7 +1538,7 @@ where
|
|||||||
|
|
||||||
/// If we are connected, returns the `PeerConnected`.
|
/// If we are connected, returns the `PeerConnected`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn into_connected(self) -> Option<PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
pub fn into_connected(self) -> Option<PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::Connected(peer) => Some(peer),
|
PeerPotentialConnect::Connected(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1517,7 +1547,7 @@ where
|
|||||||
|
|
||||||
/// If a connection is pending, returns the `PeerPendingConnect`.
|
/// If a connection is pending, returns the `PeerPendingConnect`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn into_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>> {
|
pub fn into_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>> {
|
||||||
match self {
|
match self {
|
||||||
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
|
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
|
||||||
_ => None,
|
_ => None,
|
||||||
@ -1526,11 +1556,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we are connected to.
|
/// Access to a peer we are connected to.
|
||||||
pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
pub struct PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where TTrans: Transport,
|
where TTrans: Transport,
|
||||||
{
|
{
|
||||||
/// Reference to the `active_nodes` of the parent.
|
/// Reference to the `active_nodes` of the parent.
|
||||||
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
|
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TConnInfo>, THandlerErr, (), TConnInfo, TPeerId>,
|
||||||
/// Reference to the `connected_points` field of the parent.
|
/// Reference to the `connected_points` field of the parent.
|
||||||
connected_points: &'a mut FnvHashMap<TPeerId, ConnectedPoint>,
|
connected_points: &'a mut FnvHashMap<TPeerId, ConnectedPoint>,
|
||||||
/// Reference to the `out_reach_attempts` field of the parent.
|
/// Reference to the `out_reach_attempts` field of the parent.
|
||||||
@ -1538,10 +1568,11 @@ where TTrans: Transport,
|
|||||||
peer_id: TPeerId,
|
peer_id: TPeerId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> PeerConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TPeerId: Eq + Hash + Clone,
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
{
|
{
|
||||||
/// Closes the connection to this node.
|
/// Closes the connection to this node.
|
||||||
///
|
///
|
||||||
@ -1582,19 +1613,20 @@ where
|
|||||||
|
|
||||||
/// Access to a peer we are attempting to connect to.
|
/// Access to a peer we are attempting to connect to.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
pub struct PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport
|
TTrans: Transport
|
||||||
{
|
{
|
||||||
attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>,
|
attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>,
|
||||||
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TPeerId>, THandlerErr, (), TPeerId>,
|
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, InternalReachErr<TTrans::Error, TConnInfo>, THandlerErr, (), TConnInfo, TPeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TPeerId: Eq + Hash + Clone,
|
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
|
||||||
|
TPeerId: Eq + Hash,
|
||||||
{
|
{
|
||||||
/// Interrupt this connection attempt.
|
/// Interrupt this connection attempt.
|
||||||
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
|
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
|
||||||
@ -1646,16 +1678,16 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Access to a peer we're not connected to.
|
/// Access to a peer we're not connected to.
|
||||||
pub struct PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
pub struct PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
{
|
{
|
||||||
peer_id: TPeerId,
|
peer_id: TPeerId,
|
||||||
nodes: &'a mut RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>,
|
nodes: &'a mut RawSwarm<TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for
|
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId> fmt::Debug for
|
||||||
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport,
|
TTrans: Transport,
|
||||||
TPeerId: fmt::Debug,
|
TPeerId: fmt::Debug,
|
||||||
@ -1667,16 +1699,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId>
|
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TTrans: Transport<Output = (TPeerId, TMuxer)> + Clone,
|
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
|
||||||
TTrans::Error: Send + 'static,
|
TTrans::Error: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||||
TMuxer::OutboundSubstream: Send,
|
TMuxer::OutboundSubstream: Send,
|
||||||
TMuxer::Substream: Send,
|
TMuxer::Substream: Send,
|
||||||
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
|
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
|
||||||
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
|
||||||
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
|
||||||
THandlerErr: error::Error + Send + 'static,
|
THandlerErr: error::Error + Send + 'static,
|
||||||
@ -1689,9 +1721,10 @@ where
|
|||||||
/// the whole connection is immediately closed.
|
/// the whole connection is immediately closed.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn connect(self, addr: Multiaddr, handler: THandler)
|
pub fn connect(self, addr: Multiaddr, handler: THandler)
|
||||||
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
self.connect_inner(handler, addr, Vec::new())
|
self.connect_inner(handler, addr, Vec::new())
|
||||||
}
|
}
|
||||||
@ -1706,10 +1739,11 @@ where
|
|||||||
/// the whole connection is immediately closed.
|
/// the whole connection is immediately closed.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn connect_iter<TIter>(self, addrs: TIter, handler: THandler)
|
pub fn connect_iter<TIter>(self, addrs: TIter, handler: THandler)
|
||||||
-> Result<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>, Self>
|
-> Result<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>, Self>
|
||||||
where
|
where
|
||||||
TIter: IntoIterator<Item = Multiaddr>,
|
TIter: IntoIterator<Item = Multiaddr>,
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
let mut addrs = addrs.into_iter();
|
let mut addrs = addrs.into_iter();
|
||||||
let first = match addrs.next() {
|
let first = match addrs.next() {
|
||||||
@ -1722,9 +1756,10 @@ where
|
|||||||
|
|
||||||
/// Inner implementation of `connect`.
|
/// Inner implementation of `connect`.
|
||||||
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
|
||||||
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>
|
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>
|
||||||
where
|
where
|
||||||
TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static,
|
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
|
||||||
|
TPeerId: Eq + Hash + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest);
|
self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest);
|
||||||
PeerPendingConnect {
|
PeerPendingConnect {
|
||||||
|
@ -107,7 +107,7 @@ fn successful_dial_reaches_a_node() {
|
|||||||
let mut swarm = swarm_fut.lock();
|
let mut swarm = swarm_fut.lock();
|
||||||
let poll_res = swarm.poll();
|
let poll_res = swarm.poll();
|
||||||
match poll_res {
|
match poll_res {
|
||||||
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
|
Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))),
|
||||||
_ => Ok(Async::Ready(None))
|
_ => Ok(Async::Ready(None))
|
||||||
}
|
}
|
||||||
})).expect("tokio works");
|
})).expect("tokio works");
|
||||||
@ -172,7 +172,7 @@ fn broadcasted_events_reach_active_nodes() {
|
|||||||
let mut swarm = swarm_fut.lock();
|
let mut swarm = swarm_fut.lock();
|
||||||
let poll_res = swarm.poll();
|
let poll_res = swarm.poll();
|
||||||
match poll_res {
|
match poll_res {
|
||||||
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
|
Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))),
|
||||||
_ => Ok(Async::Ready(None))
|
_ => Ok(Async::Ready(None))
|
||||||
}
|
}
|
||||||
})).expect("tokio works");
|
})).expect("tokio works");
|
||||||
@ -185,7 +185,7 @@ fn broadcasted_events_reach_active_nodes() {
|
|||||||
let mut swarm = swarm_fut.lock();
|
let mut swarm = swarm_fut.lock();
|
||||||
match swarm.poll() {
|
match swarm.poll() {
|
||||||
Async::Ready(event) => {
|
Async::Ready(event) => {
|
||||||
assert_matches!(event, RawSwarmEvent::NodeEvent { peer_id: _, event: inner_event } => {
|
assert_matches!(event, RawSwarmEvent::NodeEvent { conn_info: _, event: inner_event } => {
|
||||||
// The event we sent reached the node and triggered sending the out event we told it to return
|
// The event we sent reached the node and triggered sending the out event we told it to return
|
||||||
assert_matches!(inner_event, OutEvent::Custom("from handler 1"));
|
assert_matches!(inner_event, OutEvent::Custom("from handler 1"));
|
||||||
});
|
});
|
||||||
@ -236,7 +236,7 @@ fn querying_for_connected_peer() {
|
|||||||
let mut swarm = swarm_fut.lock();
|
let mut swarm = swarm_fut.lock();
|
||||||
let poll_res = swarm.poll();
|
let poll_res = swarm.poll();
|
||||||
match poll_res {
|
match poll_res {
|
||||||
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => Ok(Async::Ready(Some(peer_id))),
|
Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => Ok(Async::Ready(Some(conn_info))),
|
||||||
_ => Ok(Async::Ready(None))
|
_ => Ok(Async::Ready(None))
|
||||||
}
|
}
|
||||||
})).expect("tokio works");
|
})).expect("tokio works");
|
||||||
@ -385,8 +385,8 @@ fn yields_node_error_when_there_is_an_error_after_successful_connect() {
|
|||||||
let expected_peer_id = peer_id.clone();
|
let expected_peer_id = peer_id.clone();
|
||||||
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
|
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
|
||||||
let mut swarm = swarm_fut.lock();
|
let mut swarm = swarm_fut.lock();
|
||||||
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { peer_id, .. }) => {
|
assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeClosed { conn_info, .. }) => {
|
||||||
assert_eq!(peer_id, expected_peer_id);
|
assert_eq!(conn_info, expected_peer_id);
|
||||||
});
|
});
|
||||||
Ok(Async::Ready(()))
|
Ok(Async::Ready(()))
|
||||||
})).expect("tokio works");
|
})).expect("tokio works");
|
||||||
|
@ -22,6 +22,7 @@ use crate::{
|
|||||||
Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
|
Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
|
||||||
muxing::StreamMuxer,
|
muxing::StreamMuxer,
|
||||||
nodes::{
|
nodes::{
|
||||||
|
collection::ConnectionInfo,
|
||||||
handled_node::NodeHandler,
|
handled_node::NodeHandler,
|
||||||
node::Substream,
|
node::Substream,
|
||||||
raw_swarm::{self, RawSwarm, RawSwarmEvent}
|
raw_swarm::{self, RawSwarm, RawSwarmEvent}
|
||||||
@ -45,6 +46,8 @@ where TTransport: Transport,
|
|||||||
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
|
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
|
||||||
NodeHandlerWrapperBuilder<TBehaviour::ProtocolsHandler>,
|
NodeHandlerWrapperBuilder<TBehaviour::ProtocolsHandler>,
|
||||||
NodeHandlerWrapperError<<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error>,
|
NodeHandlerWrapperError<<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error>,
|
||||||
|
PeerId,
|
||||||
|
PeerId,
|
||||||
>,
|
>,
|
||||||
|
|
||||||
/// Handles which nodes to connect to and how to handle the events sent back by the protocol
|
/// Handles which nodes to connect to and how to handle the events sent back by the protocol
|
||||||
@ -240,17 +243,17 @@ where TBehaviour: NetworkBehaviour,
|
|||||||
|
|
||||||
match self.raw_swarm.poll() {
|
match self.raw_swarm.poll() {
|
||||||
Async::NotReady => raw_swarm_not_ready = true,
|
Async::NotReady => raw_swarm_not_ready = true,
|
||||||
Async::Ready(RawSwarmEvent::NodeEvent { peer_id, event }) => {
|
Async::Ready(RawSwarmEvent::NodeEvent { conn_info, event }) => {
|
||||||
self.behaviour.inject_node_event(peer_id, event);
|
self.behaviour.inject_node_event(conn_info.peer_id().clone(), event);
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => {
|
Async::Ready(RawSwarmEvent::Connected { conn_info, endpoint }) => {
|
||||||
self.behaviour.inject_connected(peer_id, endpoint);
|
self.behaviour.inject_connected(conn_info.peer_id().clone(), endpoint);
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint, .. }) => {
|
Async::Ready(RawSwarmEvent::NodeClosed { conn_info, endpoint, .. }) => {
|
||||||
self.behaviour.inject_disconnected(&peer_id, endpoint);
|
self.behaviour.inject_disconnected(conn_info.peer_id(), endpoint);
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => {
|
Async::Ready(RawSwarmEvent::Replaced { new_info, closed_endpoint, endpoint, .. }) => {
|
||||||
self.behaviour.inject_replaced(peer_id, closed_endpoint, endpoint);
|
self.behaviour.inject_replaced(new_info.peer_id().clone(), closed_endpoint, endpoint);
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
|
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
|
||||||
let handler = self.behaviour.new_handler();
|
let handler = self.behaviour.new_handler();
|
||||||
|
@ -184,13 +184,13 @@ fn raw_swarm_simultaneous_connect() {
|
|||||||
assert_eq!(swarm1_step, 2);
|
assert_eq!(swarm1_step, 2);
|
||||||
swarm1_step = 3;
|
swarm1_step = 3;
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => {
|
Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => {
|
||||||
assert_eq!(peer_id, *swarm2.local_peer_id());
|
assert_eq!(conn_info, *swarm2.local_peer_id());
|
||||||
assert_eq!(swarm1_step, 1);
|
assert_eq!(swarm1_step, 1);
|
||||||
swarm1_step = 2;
|
swarm1_step = 2;
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::Replaced { peer_id, .. }) => {
|
Async::Ready(RawSwarmEvent::Replaced { new_info, .. }) => {
|
||||||
assert_eq!(peer_id, *swarm2.local_peer_id());
|
assert_eq!(new_info, *swarm2.local_peer_id());
|
||||||
assert_eq!(swarm1_step, 2);
|
assert_eq!(swarm1_step, 2);
|
||||||
swarm1_step = 3;
|
swarm1_step = 3;
|
||||||
},
|
},
|
||||||
@ -208,13 +208,13 @@ fn raw_swarm_simultaneous_connect() {
|
|||||||
assert_eq!(swarm2_step, 2);
|
assert_eq!(swarm2_step, 2);
|
||||||
swarm2_step = 3;
|
swarm2_step = 3;
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::Connected { peer_id, .. }) => {
|
Async::Ready(RawSwarmEvent::Connected { conn_info, .. }) => {
|
||||||
assert_eq!(peer_id, *swarm1.local_peer_id());
|
assert_eq!(conn_info, *swarm1.local_peer_id());
|
||||||
assert_eq!(swarm2_step, 1);
|
assert_eq!(swarm2_step, 1);
|
||||||
swarm2_step = 2;
|
swarm2_step = 2;
|
||||||
},
|
},
|
||||||
Async::Ready(RawSwarmEvent::Replaced { peer_id, .. }) => {
|
Async::Ready(RawSwarmEvent::Replaced { new_info, .. }) => {
|
||||||
assert_eq!(peer_id, *swarm1.local_peer_id());
|
assert_eq!(new_info, *swarm1.local_peer_id());
|
||||||
assert_eq!(swarm2_step, 2);
|
assert_eq!(swarm2_step, 2);
|
||||||
swarm2_step = 3;
|
swarm2_step = 3;
|
||||||
},
|
},
|
||||||
|
Reference in New Issue
Block a user