core/: Remove TInEvent and TOutEvent (#2183)

TInEvent and TOutEvent are implied through THandler and thus
superflucious. Both are removed in favor of a derivation through
THandler.
This commit is contained in:
Max Inden
2021-08-11 12:41:28 +02:00
committed by GitHub
parent 7391b6e3f3
commit 008561283e
24 changed files with 356 additions and 244 deletions

View File

@ -12,9 +12,18 @@
- Add `From<&PublicKey> for PeerId` (see [PR 2145]).
- Remove `TInEvent` and `TOutEvent` trait paramters on most public types.
`TInEvent` and `TOutEvent` are implied through `THandler` and thus
superflucious. Both are removed in favor of a derivation through `THandler`
(see [PR 2183]).
- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug`
(see [PR 2183]).
[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137/
[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
# 0.29.0 [2021-07-12]

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
mod error;
mod handler;
pub(crate) mod handler;
mod listeners;
mod substream;

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::Multiaddr;
use std::{task::Context, task::Poll};
use std::{fmt::Debug, task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};
/// The interface of a connection handler.
@ -30,14 +30,14 @@ pub trait ConnectionHandler {
///
/// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler)
/// and [`ConnectionHandler::inject_event`].
type InEvent;
type InEvent: Debug + Send + 'static;
/// The outbound type of events that the handler emits to the `Network`
/// through [`ConnectionHandler::poll`].
///
/// See also [`NetworkEvent::ConnectionEvent`](crate::network::NetworkEvent::ConnectionEvent).
type OutEvent;
type OutEvent: Debug + Send + 'static;
/// The type of errors that the handler can produce when polled by the `Network`.
type Error;
type Error: Debug + Send + 'static;
/// The type of the substream containing the data.
type Substream;
/// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`,
@ -91,6 +91,10 @@ where
}
}
pub(crate) type THandlerInEvent<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
pub(crate) type THandlerOutEvent<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
pub(crate) type THandlerError<THandler> = <<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;
/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
@ -127,4 +131,3 @@ impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCust
}
}
}

View File

@ -44,7 +44,12 @@ use super::{
ConnectionHandler,
IntoConnectionHandler,
PendingConnectionError,
Substream
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
};
use task::{Task, TaskId};
@ -88,7 +93,7 @@ impl ConnectionId {
}
/// A connection `Manager` orchestrates the I/O of a set of connections.
pub struct Manager<I, O, H, E, HE> {
pub struct Manager<H: IntoConnectionHandler, E> {
/// The tasks of the managed connections.
///
/// Each managed connection is associated with a (background) task
@ -96,7 +101,7 @@ pub struct Manager<I, O, H, E, HE> {
/// background task via a channel. Closing that channel (i.e. dropping
/// the sender in the associated `TaskInfo`) stops the background task,
/// which will attempt to gracefully close the connection.
tasks: FnvHashMap<TaskId, TaskInfo<I>>,
tasks: FnvHashMap<TaskId, TaskInfo<THandlerInEvent<H>>>,
/// Next available identifier for a new connection / task.
next_task_id: TaskId,
@ -115,13 +120,13 @@ pub struct Manager<I, O, H, E, HE> {
/// Sender distributed to managed tasks for reporting events back
/// to the manager.
events_tx: mpsc::Sender<task::Event<O, H, E, HE>>,
events_tx: mpsc::Sender<task::Event<H, E>>,
/// Receiver for events reported from managed tasks.
events_rx: mpsc::Receiver<task::Event<O, H, E, HE>>
events_rx: mpsc::Receiver<task::Event<H, E>>
}
impl<I, O, H, E, HE> fmt::Debug for Manager<I, O, H, E, HE>
impl<H: IntoConnectionHandler, E> fmt::Debug for Manager<H, E>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map()
@ -179,7 +184,7 @@ enum TaskState {
/// Events produced by the [`Manager`].
#[derive(Debug)]
pub enum Event<'a, I, O, H, TE, HE> {
pub enum Event<'a, H: IntoConnectionHandler, TE> {
/// A connection attempt has failed.
PendingConnectionError {
/// The connection ID.
@ -206,27 +211,27 @@ pub enum Event<'a, I, O, H, TE, HE> {
connected: Connected,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
error: Option<ConnectionError<THandlerError<H>>>,
},
/// A connection has been established.
ConnectionEstablished {
/// The entry associated with the new connection.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
},
/// A connection handler has produced an event.
ConnectionEvent {
/// The entry associated with the connection that produced the event.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
/// The produced event.
event: O
event: THandlerOutEvent<H>
},
/// A connection to a node has changed its address.
AddressChange {
/// The entry associated with the connection that changed address.
entry: EstablishedEntry<'a, I>,
entry: EstablishedEntry<'a, THandlerInEvent<H>>,
/// The former [`ConnectedPoint`].
old_endpoint: ConnectedPoint,
/// The new [`ConnectedPoint`].
@ -234,7 +239,7 @@ pub enum Event<'a, I, O, H, TE, HE> {
},
}
impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
/// Creates a new connection manager.
pub fn new(config: ManagerConfig) -> Self {
let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
@ -255,19 +260,13 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
/// processing the node's events.
pub fn add_pending<F, M>(&mut self, future: F, handler: H) -> ConnectionId
where
I: Send + 'static,
O: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
F: Future<Output = ConnectResult<M, TE>> + Send + 'static,
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
{
@ -293,15 +292,9 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
I: Send + 'static,
O: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
{
@ -313,7 +306,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
sender: tx, state: TaskState::Established(info)
});
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _>>> =
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));
if let Some(executor) = &mut self.executor {
@ -326,7 +319,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
}
/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I>> {
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Some(Entry::new(task))
} else {
@ -340,7 +333,7 @@ impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
}
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE>> {
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, H, TE>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

View File

@ -31,6 +31,11 @@ use crate::{
IntoConnectionHandler,
PendingConnectionError,
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
},
};
use futures::{prelude::*, channel::mpsc, stream};
@ -53,7 +58,7 @@ pub enum Command<T> {
/// Events that a task can emit to its manager.
#[derive(Debug)]
pub enum Event<T, H, TE, HE> {
pub enum Event<H: IntoConnectionHandler, TE> {
/// A connection to a node has succeeded.
Established { id: TaskId, info: Connected },
/// A pending connection failed.
@ -61,15 +66,15 @@ pub enum Event<T, H, TE, HE> {
/// A node we are connected to has changed its address.
AddressChange { id: TaskId, new_address: Multiaddr },
/// Notify the manager of an event from the connection.
Notify { id: TaskId, event: T },
Notify { id: TaskId, event: THandlerOutEvent<H> },
/// A connection closed, possibly due to an error.
///
/// If `error` is `None`, the connection has completed
/// an active orderly close.
Closed { id: TaskId, error: Option<ConnectionError<HE>> }
Closed { id: TaskId, error: Option<ConnectionError<THandlerError<H>>> }
}
impl<T, H, TE, HE> Event<T, H, TE, HE> {
impl<H: IntoConnectionHandler, TE> Event<H, TE> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
@ -82,7 +87,7 @@ impl<T, H, TE, HE> Event<T, H, TE, HE> {
}
/// A `Task` is a [`Future`] that handles a single connection.
pub struct Task<F, M, H, I, O, E>
pub struct Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
@ -92,16 +97,16 @@ where
id: TaskId,
/// Sender to emit events to the manager of this task.
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
events: mpsc::Sender<Event<H, E>>,
/// Receiver for commands sent by the manager of this task.
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
commands: stream::Fuse<mpsc::Receiver<Command<THandlerInEvent<H>>>>,
/// Inner state of this `Task`.
state: State<F, M, H, O, E>,
state: State<F, M, H, E>,
}
impl<F, M, H, I, O, E> Task<F, M, H, I, O, E>
impl<F, M, H, E> Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
@ -110,8 +115,8 @@ where
/// Create a new task to connect and handle some node.
pub fn pending(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
future: F,
handler: H
) -> Self {
@ -129,8 +134,8 @@ where
/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
events: mpsc::Sender<Event<H, E>>,
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
connection: Connection<M, H::Handler>
) -> Self {
Task {
@ -143,7 +148,7 @@ where
}
/// The state associated with the `Task` of a connection.
enum State<F, M, H, O, E>
enum State<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
@ -165,20 +170,20 @@ where
/// is polled for new events in this state, otherwise the event
/// must be sent to the `Manager` before the connection can be
/// polled again.
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>
event: Option<Event<H, E>>,
},
/// The connection is closing (active close).
Closing(Close<M>),
/// The task is terminating with a final event for the `Manager`.
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error>),
Terminating(Event<H, E>),
/// The task has finished.
Done
}
impl<F, M, H, I, O, E> Unpin for Task<F, M, H, I, O, E>
impl<F, M, H, E> Unpin for Task<F, M, H, E>
where
M: StreamMuxer,
H: IntoConnectionHandler,
@ -186,12 +191,14 @@ where
{
}
impl<F, M, H, I, O, E> Future for Task<F, M, H, I, O, E>
impl<F, M, H, E> Future for Task<F, M, H, E>
where
M: StreamMuxer,
F: Future<Output = ConnectResult<M, E>>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
H::Handler: ConnectionHandler<
Substream = Substream<M>,
> + Send + 'static,
{
type Output = ();

View File

@ -34,6 +34,11 @@ use crate::{
OutgoingInfo,
Substream,
PendingConnectionError,
handler::{
THandlerInEvent,
THandlerOutEvent,
THandlerError,
},
manager::{self, Manager, ManagerConfig},
},
muxing::StreamMuxer,
@ -45,7 +50,7 @@ use smallvec::SmallVec;
use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll};
/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
pub struct Pool<THandler: IntoConnectionHandler, TTransErr> {
local_id: PeerId,
/// The connection counter(s).
@ -55,7 +60,7 @@ pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
/// established and pending connections.
///
/// For every established connection there is a corresponding entry in `established`.
manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
manager: Manager<THandler, TTransErr>,
/// The managed connections of each peer that are currently considered
/// established, as witnessed by the associated `ConnectedPoint`.
@ -71,8 +76,8 @@ pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
disconnected: Vec<Disconnected>,
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
impl<THandler: IntoConnectionHandler, TTransErr> fmt::Debug
for Pool<THandler, TTransErr>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool")
@ -81,14 +86,14 @@ for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {}
impl<THandler: IntoConnectionHandler, TTransErr> Unpin
for Pool<THandler, TTransErr> {}
/// Event that can happen on the `Pool`.
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> {
/// A new connection has been established.
ConnectionEstablished {
connection: EstablishedConnection<'a, TInEvent>,
connection: EstablishedConnection<'a, THandlerInEvent<THandler>>,
num_established: NonZeroU32,
},
@ -109,9 +114,9 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
connected: Connected,
/// The error that occurred, if any. If `None`, the connection
/// was closed by the local peer.
error: Option<ConnectionError<THandlerErr>>,
error: Option<ConnectionError<THandlerError<THandler>>>,
/// A reference to the pool that used to manage the connection.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
pool: &'a mut Pool<THandler, TTransErr>,
/// The remaining number of established connections to the same peer.
num_established: u32,
},
@ -130,21 +135,21 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
/// The (expected) peer of the failed connection.
peer: Option<PeerId>,
/// A reference to the pool that managed the connection.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
pool: &'a mut Pool<THandler, TTransErr>,
},
/// A node has produced an event.
ConnectionEvent {
/// The connection that has generated the event.
connection: EstablishedConnection<'a, TInEvent>,
connection: EstablishedConnection<'a, THandlerInEvent<THandler>>,
/// The produced event.
event: TOutEvent,
event: THandlerOutEvent<THandler>,
},
/// The connection to a node has changed its address.
AddressChange {
/// The connection that has changed address.
connection: EstablishedConnection<'a, TInEvent>,
connection: EstablishedConnection<'a, THandlerInEvent<THandler>>,
/// The new endpoint.
new_endpoint: ConnectedPoint,
/// The old endpoint.
@ -152,13 +157,9 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
},
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
impl<'a, THandler: IntoConnectionHandler, TTransErr> fmt::Debug for PoolEvent<'a, THandler, TTransErr>
where
TOutEvent: fmt::Debug,
TTransErr: fmt::Debug,
THandlerErr: fmt::Debug,
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
@ -197,8 +198,8 @@ where
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
impl<THandler: IntoConnectionHandler, TTransErr>
Pool<THandler, TTransErr>
{
/// Creates a new empty `Pool`.
pub fn new(
@ -239,15 +240,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
@ -274,15 +269,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
@ -307,15 +296,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
@ -360,15 +343,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = connection::Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
@ -384,7 +361,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
///
/// Returns `None` if the pool has no connection with the given ID.
pub fn get(&mut self, id: ConnectionId)
-> Option<PoolConnection<'_, TInEvent>>
-> Option<PoolConnection<'_, THandlerInEvent<THandler>>>
{
match self.manager.entry(id) {
Some(manager::Entry::Established(entry)) =>
@ -403,7 +380,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Gets an established connection from the pool by ID.
pub fn get_established(&mut self, id: ConnectionId)
-> Option<EstablishedConnection<'_, TInEvent>>
-> Option<EstablishedConnection<'_, THandlerInEvent<THandler>>>
{
match self.get(id) {
Some(PoolConnection::Established(c)) => Some(c),
@ -413,7 +390,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Gets a pending outgoing connection by ID.
pub fn get_outgoing(&mut self, id: ConnectionId)
-> Option<PendingConnection<'_, TInEvent>>
-> Option<PendingConnection<'_, THandlerInEvent<THandler>>>
{
match self.pending.get(&id) {
Some((ConnectedPoint::Dialer { .. }, _peer)) =>
@ -494,11 +471,9 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId)
-> EstablishedConnectionIter<'a,
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTransErr,
THandlerErr>
>
{
let ids = self.iter_peer_established_info(peer)
.map(|(id, _endpoint)| *id)
@ -563,7 +538,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// > **Note**: We use a regular `poll` method instead of implementing `Stream`,
/// > because we want the `Pool` to stay borrowed if necessary.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
PoolEvent<'a, THandler, TTransErr>
> {
// Drain events resulting from forced disconnections.
//
@ -828,22 +803,21 @@ impl<TInEvent> EstablishedConnection<'_, TInEvent> {
}
/// An iterator over established connections in a pool.
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
ids: I
pub struct EstablishedConnectionIter<'a, I, THandler: IntoConnectionHandler, TTransErr> {
pool: &'a mut Pool<THandler, TTransErr>,
ids: I,
}
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
impl<'a, I, THandler: IntoConnectionHandler, TTransErr> EstablishedConnectionIter<'a, I, THandler, TTransErr>
where
I: Iterator<Item = ConnectionId>
{
/// Obtains the next connection, if any.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
pub fn next(&mut self) -> Option<EstablishedConnection<'_, THandlerInEvent<THandler>>>
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) { // (*)
@ -865,7 +839,7 @@ where
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(mut self)
-> Option<EstablishedConnection<'b, TInEvent>>
-> Option<EstablishedConnection<'b, THandlerInEvent<THandler>>>
where 'a: 'b
{
while let Some(id) = self.ids.next() {

View File

@ -42,6 +42,10 @@ use crate::{
ListenersStream,
PendingConnectionError,
Substream,
handler::{
THandlerInEvent,
THandlerOutEvent,
},
manager::ManagerConfig,
pool::{Pool, PoolEvent},
},
@ -62,7 +66,7 @@ use std::{
};
/// Implementation of `Stream` that handles the nodes.
pub struct Network<TTrans, TInEvent, TOutEvent, THandler>
pub struct Network<TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -74,8 +78,7 @@ where
listeners: ListenersStream<TTrans>,
/// The nodes currently active.
pool: Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error>,
pool: Pool<THandler, TTrans::Error>,
/// The ongoing dialing attempts.
///
@ -92,8 +95,8 @@ where
dialing: FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
}
impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
Network<TTrans, TInEvent, TOutEvent, THandler>
impl<TTrans, THandler> fmt::Debug for
Network<TTrans, THandler>
where
TTrans: fmt::Debug + Transport,
THandler: fmt::Debug + ConnectionHandler,
@ -108,16 +111,16 @@ where
}
}
impl<TTrans, TInEvent, TOutEvent, THandler> Unpin for
Network<TTrans, TInEvent, TOutEvent, THandler>
impl<TTrans, THandler> Unpin for
Network<TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
}
impl<TTrans, TInEvent, TOutEvent, THandler>
Network<TTrans, TInEvent, TOutEvent, THandler>
impl<TTrans, THandler>
Network<TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -128,15 +131,15 @@ where
}
}
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler>
Network<TTrans, TInEvent, TOutEvent, THandler>
impl<TTrans, TMuxer, THandler>
Network<TTrans, THandler>
where
TTrans: Transport + Clone,
TMuxer: StreamMuxer,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
{
/// Creates a new node events stream.
pub fn new(
@ -223,8 +226,6 @@ where
TTrans::Dial: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
@ -313,7 +314,7 @@ where
/// Obtains a view of a [`Peer`] with the given ID in the network.
pub fn peer(&mut self, peer_id: PeerId)
-> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
-> Peer<'_, TTrans, THandler>
{
Peer::new(self, peer_id)
}
@ -329,8 +330,6 @@ where
handler: THandler,
) -> Result<ConnectionId, ConnectionLimit>
where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TTrans: Transport<Output = (PeerId, TMuxer)>,
@ -347,7 +346,7 @@ where
}
/// Provides an API similar to `Stream`, except that it cannot error.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>>
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, THandlerInEvent<THandler>, THandlerOutEvent<THandler>, THandler>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
@ -355,10 +354,7 @@ where
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
{
// Poll the listener(s) for new connections.
@ -455,8 +451,6 @@ where
TTrans::Error: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
}
@ -472,10 +466,9 @@ struct DialingOpts<PeerId, THandler> {
}
/// Standalone implementation of `Network::dial_peer` for more granular borrowing.
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
fn dial_peer_impl<TMuxer, THandler, TTrans>(
transport: TTrans,
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error>,
pool: &mut Pool<THandler, TTrans::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, DialError>
@ -485,16 +478,12 @@ where
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
> + Send + 'static,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
// Ensure the address to dial encapsulates the `p2p` protocol for the
// targeted peer, so that the transport has a "fully qualified" address
@ -531,13 +520,13 @@ where
///
/// If the failed connection attempt was a dialing attempt and there
/// are more addresses to try, new `DialingOpts` are returned.
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>(
fn on_connection_failed<'a, TTrans, THandler>(
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
handler: Option<THandler>,
) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>)
) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, THandlerInEvent<THandler>, THandlerOutEvent<THandler>, THandler>)
where
TTrans: Transport,
THandler: IntoConnectionHandler,

View File

@ -34,6 +34,7 @@ use crate::{
IntoConnectionHandler,
PendingConnection,
Substream,
handler::THandlerInEvent,
pool::Pool,
},
PeerId
@ -53,30 +54,30 @@ use super::{Network, DialingOpts, DialError};
/// > **Note**: In any state there may always be a pending incoming
/// > connection attempt from the peer, however, the remote identity
/// > of a peer is only known once a connection is fully established.
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
pub enum Peer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler
{
/// At least one established connection exists to the peer.
Connected(ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
Connected(ConnectedPeer<'a, TTrans, THandler>),
/// There is an ongoing dialing (i.e. outgoing connection) attempt
/// to the peer. There may already be other established connections
/// to the peer.
Dialing(DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
Dialing(DialingPeer<'a, TTrans, THandler>),
/// There exists no established connection to the peer and there is
/// currently no ongoing dialing (i.e. outgoing connection) attempt
/// in progress.
Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
Disconnected(DisconnectedPeer<'a, TTrans, THandler>),
/// The peer represents the local node.
Local,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler> fmt::Debug for
Peer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -106,14 +107,14 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler>
Peer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
pub(super) fn new(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId
) -> Self {
if peer_id == network.local_peer_id {
@ -133,39 +134,37 @@ where
fn disconnected(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId
) -> Self {
Peer::Disconnected(DisconnectedPeer { network, peer_id })
}
fn connected(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId
) -> Self {
Peer::Connected(ConnectedPeer { network, peer_id })
}
fn dialing(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId
) -> Self {
Peer::Dialing(DialingPeer { network, peer_id })
}
}
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, TMuxer, THandler>
Peer<'a, TTrans, THandler>
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
{
@ -209,7 +208,7 @@ where
/// attempt to the first address fails.
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
(ConnectionId, DialingPeer<'a, TTrans, THandler>),
DialError
>
where
@ -238,7 +237,7 @@ where
///
/// Succeeds if the there is at least one established connection to the peer.
pub fn into_connected(self) -> Option<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
ConnectedPeer<'a, TTrans, THandler>
> {
match self {
Peer::Connected(peer) => Some(peer),
@ -252,7 +251,7 @@ where
///
/// Succeeds if the there is at least one pending outgoing connection to the peer.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
DialingPeer<'a, TTrans, THandler>
> {
match self {
Peer::Dialing(peer) => Some(peer),
@ -265,7 +264,7 @@ where
/// Converts the peer into a `DisconnectedPeer`, if neither an established connection
/// nor a dialing attempt exists.
pub fn into_disconnected(self) -> Option<
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
DisconnectedPeer<'a, TTrans, THandler>
> {
match self {
Peer::Disconnected(peer) => Some(peer),
@ -277,17 +276,17 @@ where
/// The representation of a peer in a [`Network`] to whom at least
/// one established connection exists. There may also be additional ongoing
/// dialing attempts to the peer.
pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
pub struct ConnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler>
ConnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -297,13 +296,13 @@ where
}
/// Returns the `ConnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Connected(self)
}
/// Obtains an established connection to the peer by ID.
pub fn connection(&mut self, id: ConnectionId)
-> Option<EstablishedConnection<TInEvent>>
-> Option<EstablishedConnection<THandlerInEvent<THandler>>>
{
self.network.pool.get_established(id)
}
@ -323,7 +322,7 @@ where
/// Converts this peer into a [`DialingPeer`], if there is an ongoing
/// dialing attempt, `None` otherwise.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
DialingPeer<'a, TTrans, THandler>
> {
if self.network.dialing.contains_key(&self.peer_id) {
Some(DialingPeer { network: self.network, peer_id: self.peer_id })
@ -336,18 +335,16 @@ where
pub fn connections(&mut self) ->
EstablishedConnectionIter<
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error>
>
{
self.network.pool.iter_peer_established(&self.peer_id)
}
/// Obtains some established connection to the peer.
pub fn some_connection(&mut self)
-> EstablishedConnection<TInEvent>
-> EstablishedConnection<THandlerInEvent<THandler>>
{
self.connections()
.into_first()
@ -356,15 +353,15 @@ where
/// Disconnects from the peer, closing all connections.
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
-> DisconnectedPeer<'a, TTrans, THandler>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler> fmt::Debug for
ConnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -381,17 +378,17 @@ where
/// The representation of a peer in a [`Network`] to whom a dialing
/// attempt is ongoing. There may already exist other established
/// connections to this peer.
pub struct DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
pub struct DialingPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler>
DialingPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -401,14 +398,14 @@ where
}
/// Returns the `DialingPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Dialing(self)
}
/// Disconnects from this peer, closing all established connections and
/// aborting all dialing attempts.
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
-> DisconnectedPeer<'a, TTrans, THandler>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
@ -423,7 +420,7 @@ where
/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
pub fn into_connected(self)
-> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>>
-> Option<ConnectedPeer<'a, TTrans, THandler>>
{
if self.is_connected() {
Some(ConnectedPeer { peer_id: self.peer_id, network: self.network })
@ -435,7 +432,7 @@ where
/// Obtains a dialing attempt to the peer by connection ID of
/// the current connection attempt.
pub fn attempt(&mut self, id: ConnectionId)
-> Option<DialingAttempt<'_, TInEvent>>
-> Option<DialingAttempt<'_, THandlerInEvent<THandler>>>
{
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id) {
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
@ -448,13 +445,7 @@ where
}
/// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer.
pub fn attempts(&mut self)
-> DialingAttemptIter<'_,
TInEvent,
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error>
pub fn attempts(&mut self) -> DialingAttemptIter<'_, THandler, TTrans::Error>
{
DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing)
}
@ -463,7 +454,7 @@ where
///
/// At least one dialing connection is guaranteed to exist on a `DialingPeer`.
pub fn some_attempt(&mut self)
-> DialingAttempt<'_, TInEvent>
-> DialingAttempt<'_, THandlerInEvent<THandler>>
{
self.attempts()
.into_first()
@ -471,8 +462,8 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler> fmt::Debug for
DialingPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -489,17 +480,17 @@ where
/// The representation of a peer to whom the `Network` has currently
/// neither an established connection, nor an ongoing dialing attempt
/// initiated by the local peer.
pub struct DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
pub struct DisconnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
peer_id: PeerId,
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
network: &'a mut Network<TTrans, THandler>,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler> fmt::Debug for
DisconnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -511,8 +502,8 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
impl<'a, TTrans, THandler>
DisconnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
@ -522,7 +513,7 @@ where
}
/// Returns the `DisconnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Disconnected(self)
}
@ -539,14 +530,12 @@ where
connected: Connected,
connection: Connection<TMuxer, THandler::Handler>,
) -> Result<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
ConnectedPeer<'a, TTrans, THandler>,
ConnectionLimit
> where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: Send + 'static,
TTrans::Error: Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
@ -631,11 +620,11 @@ impl<'a, TInEvent>
}
/// An iterator over the ongoing dialing attempts to a peer.
pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
pub struct DialingAttemptIter<'a, THandler: IntoConnectionHandler, TTransErr> {
/// The peer whose dialing attempts are being iterated.
peer_id: &'a PeerId,
/// The underlying connection `Pool` of the `Network`.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
pool: &'a mut Pool<THandler, TTransErr>,
/// The state of all current dialing attempts known to the `Network`.
///
/// Ownership of the `OccupiedEntry` for `peer_id` containing all attempts must be
@ -651,12 +640,12 @@ pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THan
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
impl<'a, THandler: IntoConnectionHandler, TTransErr>
DialingAttemptIter<'a, THandler, TTransErr>
{
fn new(
peer_id: &'a PeerId,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
pool: &'a mut Pool<THandler, TTransErr>,
dialing: &'a mut FnvHashMap<PeerId, SmallVec<[DialingState; 10]>>,
) -> Self {
let end = dialing.get(peer_id).map_or(0, |conns| conns.len());
@ -665,7 +654,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Obtains the next dialing connection, if any.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<DialingAttempt<'_, TInEvent>> {
pub fn next(&mut self) -> Option<DialingAttempt<'_, THandlerInEvent<THandler>>> {
// If the number of elements reduced, the current `DialingAttempt` has been
// aborted and iteration needs to continue from the previous position to
// account for the removed element.
@ -693,7 +682,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(self)
-> Option<DialingAttempt<'b, TInEvent>>
-> Option<DialingAttempt<'b, THandlerInEvent<THandler>>>
where 'a: 'b
{
if self.pos == self.end {

View File

@ -23,7 +23,7 @@ use libp2p_noise as noise;
use libp2p_tcp as tcp;
use std::{io, pin::Pin, task::Context, task::Poll};
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
type TestNetwork = Network<TestTransport, TestHandler>;
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>;
/// Creates a new `TestNetwork` with a TCP transport.

View File

@ -38,7 +38,7 @@ use rand::random;
use std::{io, task::{Context, Poll}};
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>;
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
type TestNetwork = Network<TestTransport, TestHandler>;
fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) {
let keys = identity::Keypair::generate_ed25519();

View File

@ -395,6 +395,7 @@ impl NetworkBehaviour for Floodsub {
}
/// Transmission between the `OneShotHandler` and the `FloodsubHandler`.
#[derive(Debug)]
pub enum InnerMessage {
/// We received an RPC from a remote.
Rx(FloodsubRpc),

View File

@ -55,7 +55,7 @@ impl<T> KademliaHandlerProto<T> {
}
}
impl<T: Clone + Send + 'static> IntoProtocolsHandler for KademliaHandlerProto<T> {
impl<T: Clone + fmt::Debug + Send + 'static> IntoProtocolsHandler for KademliaHandlerProto<T> {
type Handler = KademliaHandler<T>;
fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
@ -457,7 +457,7 @@ impl<TUserData> KademliaHandler<TUserData> {
impl<TUserData> ProtocolsHandler for KademliaHandler<TUserData>
where
TUserData: Clone + Send + 'static,
TUserData: Clone + fmt::Debug + Send + 'static,
{
type InEvent = KademliaHandlerIn<TUserData>;
type OutEvent = KademliaHandlerEvent<TUserData>;

View File

@ -2,6 +2,10 @@
- Update dependencies.
- Implement `Debug` for `RelayHandlerEvent` and `RelayHandlerIn`. See [PR 2183].
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
# 0.3.0 [2021-07-12]
- Update dependencies.

View File

@ -33,6 +33,7 @@ use libp2p_swarm::{
ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use log::warn;
use std::fmt;
use std::task::{Context, Poll};
use std::time::Duration;
use wasm_timer::Instant;
@ -195,6 +196,55 @@ pub enum RelayHandlerEvent {
},
}
impl fmt::Debug for RelayHandlerEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RelayHandlerEvent::IncomingRelayReq {
request_id,
src_addr,
req: _,
} => f
.debug_struct("RelayHandlerEvent::IncomingRelayReq")
.field("request_id", request_id)
.field("src_addr", src_addr)
.finish(),
RelayHandlerEvent::IncomingDstReq(_) => {
f.debug_tuple("RelayHandlerEvent::IncomingDstReq").finish()
}
RelayHandlerEvent::OutgoingRelayReqSuccess(peer_id, request_id, connection) => f
.debug_tuple("RelayHandlerEvent::OutgoingRelayReqSuccess")
.field(peer_id)
.field(request_id)
.field(connection)
.finish(),
RelayHandlerEvent::IncomingDstReqSuccess {
stream,
src_peer_id,
relay_peer_id,
relay_addr,
} => f
.debug_struct("RelayHandlerEvent::IncomingDstReqSuccess")
.field("stream", stream)
.field("src_peer_id", src_peer_id)
.field("relay_peer_id", relay_peer_id)
.field("relay_addr", relay_addr)
.finish(),
RelayHandlerEvent::OutgoingRelayReqError(peer_id, request_id) => f
.debug_tuple("RelayHandlerEvent::OutgoingRelayReqError")
.field(peer_id)
.field(request_id)
.finish(),
RelayHandlerEvent::OutgoingDstReqError {
src_connection_id,
incoming_relay_req_deny_fut: _,
} => f
.debug_struct("RelayHandlerEvent::OutgoingDstReqError")
.field("src_connection_id", src_connection_id)
.finish(),
}
}
}
/// Event that can be sent to the relay handler.
pub enum RelayHandlerIn {
/// Tell the handler whether it is handling a connection used to listen for incoming relayed
@ -233,6 +283,48 @@ pub enum RelayHandlerIn {
},
}
impl fmt::Debug for RelayHandlerIn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RelayHandlerIn::UsedForListening(_) => {
f.debug_tuple("RelayHandlerIn::UsedForListening").finish()
}
RelayHandlerIn::DenyIncomingRelayReq(_) => f
.debug_tuple("RelayHandlerIn::DenyIncomingRelayReq")
.finish(),
RelayHandlerIn::AcceptDstReq(_) => {
f.debug_tuple("RelayHandlerIn::AcceptDstReq").finish()
}
RelayHandlerIn::DenyDstReq(_) => f.debug_tuple("RelayHandlerIn::DenyDstReq").finish(),
RelayHandlerIn::OutgoingRelayReq {
src_peer_id,
dst_peer_id,
request_id,
dst_addr,
} => f
.debug_struct("RelayHandlerIn::OutgoingRelayReq")
.field("src_peer_id", src_peer_id)
.field("dst_peer_id", dst_peer_id)
.field("request_id", request_id)
.field("dst_addr", dst_addr)
.finish(),
RelayHandlerIn::OutgoingDstReq {
src_peer_id,
src_addr,
src_connection_id,
request_id,
incoming_relay_req: _,
} => f
.debug_struct("RelayHandlerIn::OutgoingDstReq")
.field("src_peer_id", src_peer_id)
.field("src_addr", src_addr)
.field("src_connection_id", src_connection_id)
.field("request_id", request_id)
.finish(),
}
}
}
impl RelayHandler {
/// Builds a new `RelayHandler`.
pub fn new(

View File

@ -2,6 +2,11 @@
- Update dependencies.
- Manually implement `Debug` for `RequestResponseHandlerEvent` and
`RequestProtocol`. See [PR 2183].
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
# 0.12.0 [2021-07-12]
- Update dependencies.

View File

@ -46,6 +46,7 @@ use libp2p_swarm::{
use smallvec::SmallVec;
use std::{
collections::VecDeque,
fmt,
io,
sync::{atomic::{AtomicU64, Ordering}, Arc},
time::Duration,
@ -114,7 +115,6 @@ where
/// The events emitted by the [`RequestResponseHandler`].
#[doc(hidden)]
#[derive(Debug)]
pub enum RequestResponseHandlerEvent<TCodec>
where
TCodec: RequestResponseCodec
@ -147,6 +147,37 @@ where
InboundUnsupportedProtocols(RequestId),
}
impl<TCodec: RequestResponseCodec> fmt::Debug for RequestResponseHandlerEvent<TCodec> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RequestResponseHandlerEvent::Request { request_id, request: _, sender: _ } => f.debug_struct("RequestResponseHandlerEvent::Request")
.field("request_id", request_id)
.finish(),
RequestResponseHandlerEvent::Response { request_id, response: _ } => f.debug_struct("RequestResponseHandlerEvent::Response")
.field("request_id", request_id)
.finish(),
RequestResponseHandlerEvent::ResponseSent(request_id) => f.debug_tuple("RequestResponseHandlerEvent::ResponseSent")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::ResponseOmission(request_id) => f.debug_tuple("RequestResponseHandlerEvent::ResponseOmission")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::OutboundTimeout(request_id) => f.debug_tuple("RequestResponseHandlerEvent::OutboundTimeout")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => f.debug_tuple("RequestResponseHandlerEvent::OutboundUnsupportedProtocols")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::InboundTimeout(request_id) => f.debug_tuple("RequestResponseHandlerEvent::InboundTimeout")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => f.debug_tuple("RequestResponseHandlerEvent::InboundUnsupportedProtocols")
.field(request_id)
.finish(),
}
}
}
impl<TCodec> ProtocolsHandler for RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec + Send + Clone + 'static,
@ -345,4 +376,3 @@ where
Poll::Pending
}
}

View File

@ -30,7 +30,7 @@ use futures::{channel::oneshot, future::BoxFuture, prelude::*};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_swarm::NegotiatedSubstream;
use smallvec::SmallVec;
use std::io;
use std::{fmt, io};
/// The level of support for a particular protocol.
#[derive(Debug, Clone)]
@ -127,7 +127,6 @@ where
/// Request substream upgrade protocol.
///
/// Sends a request and receives a response.
#[derive(Debug)]
pub struct RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec
@ -138,6 +137,17 @@ where
pub(crate) request: TCodec::Request,
}
impl<TCodec> fmt::Debug for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestProtocol")
.field("request_id", &self.request_id)
.finish()
}
}
impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec

View File

@ -10,8 +10,12 @@
trait parameters on `Swarm` (previously `ExpandedSwarm`), deriving parameters
through associated types on `TBehaviour`. See [PR 2182].
- Require `ProtocolsHandler::{InEvent,OutEvent,Error}` to implement `Debug` (see
[PR 2183]).
[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150/
[PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
# 0.30.0 [2021-07-12]

View File

@ -100,6 +100,7 @@ use libp2p_core::{Executor, Multiaddr, Negotiated, PeerId, Transport, connection
ConnectionLimit,
ConnectedPoint,
EstablishedConnection,
ConnectionHandler,
IntoConnectionHandler,
ListenerId,
PendingConnectionError,
@ -278,8 +279,6 @@ where
{
network: Network<
transport::Boxed<(PeerId, StreamMuxerBox)>,
THandlerInEvent<TBehaviour>,
THandlerOutEvent<TBehaviour>,
NodeHandlerWrapperBuilder<THandler<TBehaviour>>,
>,
@ -682,7 +681,7 @@ where
}
},
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
if let Some((event, ids)) = notify_any::<_, _, TBehaviour>(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
@ -759,7 +758,7 @@ where
}
NotifyHandler::Any => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
if let Some((event, ids)) = notify_any::<_, _, TBehaviour>(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
@ -838,15 +837,17 @@ fn notify_one<'a, THandlerInEvent>(
///
/// Returns `None` if either all connections are closing or the event
/// was successfully sent to a handler, in either case the event is consumed.
fn notify_any<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>(
fn notify_any<'a, TTrans, THandler, TBehaviour>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>,
event: THandlerInEvent,
peer: &mut ConnectedPeer<'a, TTrans, THandler>,
event: THandlerInEvent<TBehaviour>,
cx: &mut Context<'_>,
) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)>
) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TBehaviour: NetworkBehaviour,
THandler: IntoConnectionHandler,
THandler::Handler: ConnectionHandler<InEvent = THandlerInEvent<TBehaviour>, OutEvent = THandlerOutEvent<TBehaviour>>
{
let mut pending = SmallVec::new();
let mut event = Some(event); // (1)

View File

@ -101,11 +101,11 @@ pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect};
/// continue reading data until the remote closes its side of the connection.
pub trait ProtocolsHandler: Send + 'static {
/// Custom event that can be received from the outside.
type InEvent: Send + 'static;
type InEvent: fmt::Debug + Send + 'static;
/// Custom event that can be produced by the handler and that will be returned to the outside.
type OutEvent: Send + 'static;
type OutEvent: fmt::Debug + Send + 'static;
/// The type of errors returned by [`ProtocolsHandler::poll`].
type Error: error::Error + Send + 'static;
type Error: error::Error + fmt::Debug + Send + 'static;
/// The inbound upgrade for the protocol(s) used by the handler.
type InboundProtocol: InboundUpgradeSend;
/// The outbound upgrade for the protocol(s) used by the handler.

View File

@ -27,7 +27,7 @@ use crate::protocols_handler::{
ProtocolsHandlerUpgrErr
};
use libp2p_core::Multiaddr;
use std::{marker::PhantomData, task::Context, task::Poll};
use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll};
/// Wrapper around a protocol handler that turns the input event into something else.
pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {
@ -51,7 +51,7 @@ impl<TProtoHandler, TMap, TNewIn> ProtocolsHandler for MapInEvent<TProtoHandler,
where
TProtoHandler: ProtocolsHandler,
TMap: Fn(TNewIn) -> Option<TProtoHandler::InEvent>,
TNewIn: Send + 'static,
TNewIn: Debug + Send + 'static,
TMap: Send + 'static,
{
type InEvent = TNewIn;

View File

@ -27,6 +27,7 @@ use crate::protocols_handler::{
ProtocolsHandlerUpgrErr
};
use libp2p_core::Multiaddr;
use std::fmt::Debug;
use std::task::{Context, Poll};
/// Wrapper around a protocol handler that turns the output event into something else.
@ -49,7 +50,7 @@ impl<TProtoHandler, TMap, TNewOut> ProtocolsHandler for MapOutEvent<TProtoHandle
where
TProtoHandler: ProtocolsHandler,
TMap: FnMut(TProtoHandler::OutEvent) -> TNewOut,
TNewOut: Send + 'static,
TNewOut: Debug + Send + 'static,
TMap: Send + 'static,
{
type InEvent = TProtoHandler::InEvent;

View File

@ -43,7 +43,7 @@ use std::{
cmp,
collections::{HashMap, HashSet},
error,
fmt,
fmt::{self, Debug},
hash::Hash,
iter::{self, FromIterator},
task::{Context, Poll},
@ -88,7 +88,7 @@ where
impl<K, H> ProtocolsHandler for MultiHandler<K, H>
where
K: Clone + Hash + Eq + Send + 'static,
K: Clone + Debug + Hash + Eq + Send + 'static,
H: ProtocolsHandler,
H::InboundProtocol: InboundUpgradeSend,
H::OutboundProtocol: OutboundUpgradeSend
@ -312,7 +312,7 @@ where
impl<K, H> IntoProtocolsHandler for IntoMultiHandler<K, H>
where
K: Clone + Eq + Hash + Send + 'static,
K: Debug + Clone + Eq + Hash + Send + 'static,
H: IntoProtocolsHandler
{
type Handler = MultiHandler<K, H::Handler>;

View File

@ -28,7 +28,7 @@ use crate::protocols_handler::{
};
use smallvec::SmallVec;
use std::{error, task::Context, task::Poll, time::Duration};
use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;
/// A `ProtocolsHandler` that opens a new substream for each request.
@ -119,12 +119,12 @@ where
impl<TInbound, TOutbound, TEvent> ProtocolsHandler for OneShotHandler<TInbound, TOutbound, TEvent>
where
TInbound: InboundUpgradeSend + Send + 'static,
TOutbound: OutboundUpgradeSend,
TOutbound: Debug + OutboundUpgradeSend,
TInbound::Output: Into<TEvent>,
TOutbound::Output: Into<TEvent>,
TOutbound::Error: error::Error + Send + 'static,
SubstreamProtocol<TInbound, ()>: Clone,
TEvent: Send + 'static,
TEvent: Debug + Send + 'static,
{
type InEvent = TOutbound;
type OutEvent = TEvent;