diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index cc19a911..695d1522 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -12,9 +12,18 @@ - Add `From<&PublicKey> for PeerId` (see [PR 2145]). +- Remove `TInEvent` and `TOutEvent` trait paramters on most public types. + `TInEvent` and `TOutEvent` are implied through `THandler` and thus + superflucious. Both are removed in favor of a derivation through `THandler` + (see [PR 2183]). + +- Require `ConnectionHandler::{InEvent,OutEvent,Error}` to implement `Debug` + (see [PR 2183]). + [PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145 [PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142 -[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137/ +[PR 2137]: https://github.com/libp2p/rust-libp2p/pull/2137 +[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 # 0.29.0 [2021-07-12] diff --git a/core/src/connection.rs b/core/src/connection.rs index 455a708b..50b44b86 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. mod error; -mod handler; +pub(crate) mod handler; mod listeners; mod substream; diff --git a/core/src/connection/handler.rs b/core/src/connection/handler.rs index a2cff9ee..0f1c2f6b 100644 --- a/core/src/connection/handler.rs +++ b/core/src/connection/handler.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::Multiaddr; -use std::{task::Context, task::Poll}; +use std::{fmt::Debug, task::Context, task::Poll}; use super::{Connected, SubstreamEndpoint}; /// The interface of a connection handler. @@ -30,14 +30,14 @@ pub trait ConnectionHandler { /// /// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler) /// and [`ConnectionHandler::inject_event`]. - type InEvent; + type InEvent: Debug + Send + 'static; /// The outbound type of events that the handler emits to the `Network` /// through [`ConnectionHandler::poll`]. /// /// See also [`NetworkEvent::ConnectionEvent`](crate::network::NetworkEvent::ConnectionEvent). - type OutEvent; + type OutEvent: Debug + Send + 'static; /// The type of errors that the handler can produce when polled by the `Network`. - type Error; + type Error: Debug + Send + 'static; /// The type of the substream containing the data. type Substream; /// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`, @@ -91,6 +91,10 @@ where } } +pub(crate) type THandlerInEvent = <::Handler as ConnectionHandler>::InEvent; +pub(crate) type THandlerOutEvent = <::Handler as ConnectionHandler>::OutEvent; +pub(crate) type THandlerError = <::Handler as ConnectionHandler>::Error; + /// Event produced by a handler. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ConnectionHandlerEvent { @@ -127,4 +131,3 @@ impl ConnectionHandlerEvent { +pub struct Manager { /// The tasks of the managed connections. /// /// Each managed connection is associated with a (background) task @@ -96,7 +101,7 @@ pub struct Manager { /// background task via a channel. Closing that channel (i.e. dropping /// the sender in the associated `TaskInfo`) stops the background task, /// which will attempt to gracefully close the connection. - tasks: FnvHashMap>, + tasks: FnvHashMap>>, /// Next available identifier for a new connection / task. next_task_id: TaskId, @@ -115,13 +120,13 @@ pub struct Manager { /// Sender distributed to managed tasks for reporting events back /// to the manager. - events_tx: mpsc::Sender>, + events_tx: mpsc::Sender>, /// Receiver for events reported from managed tasks. - events_rx: mpsc::Receiver> + events_rx: mpsc::Receiver> } -impl fmt::Debug for Manager +impl fmt::Debug for Manager { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_map() @@ -179,7 +184,7 @@ enum TaskState { /// Events produced by the [`Manager`]. #[derive(Debug)] -pub enum Event<'a, I, O, H, TE, HE> { +pub enum Event<'a, H: IntoConnectionHandler, TE> { /// A connection attempt has failed. PendingConnectionError { /// The connection ID. @@ -206,27 +211,27 @@ pub enum Event<'a, I, O, H, TE, HE> { connected: Connected, /// The error that occurred, if any. If `None`, the connection /// has been actively closed. - error: Option>, + error: Option>>, }, /// A connection has been established. ConnectionEstablished { /// The entry associated with the new connection. - entry: EstablishedEntry<'a, I>, + entry: EstablishedEntry<'a, THandlerInEvent>, }, /// A connection handler has produced an event. ConnectionEvent { /// The entry associated with the connection that produced the event. - entry: EstablishedEntry<'a, I>, + entry: EstablishedEntry<'a, THandlerInEvent>, /// The produced event. - event: O + event: THandlerOutEvent }, /// A connection to a node has changed its address. AddressChange { /// The entry associated with the connection that changed address. - entry: EstablishedEntry<'a, I>, + entry: EstablishedEntry<'a, THandlerInEvent>, /// The former [`ConnectedPoint`]. old_endpoint: ConnectedPoint, /// The new [`ConnectedPoint`]. @@ -234,7 +239,7 @@ pub enum Event<'a, I, O, H, TE, HE> { }, } -impl Manager { +impl Manager { /// Creates a new connection manager. pub fn new(config: ManagerConfig) -> Self { let (tx, rx) = mpsc::channel(config.task_event_buffer_size); @@ -255,19 +260,13 @@ impl Manager { /// processing the node's events. pub fn add_pending(&mut self, future: F, handler: H) -> ConnectionId where - I: Send + 'static, - O: Send + 'static, TE: error::Error + Send + 'static, - HE: error::Error + Send + 'static, M: StreamMuxer + Send + Sync + 'static, M::OutboundSubstream: Send + 'static, F: Future> + Send + 'static, H: IntoConnectionHandler + Send + 'static, H::Handler: ConnectionHandler< Substream = Substream, - InEvent = I, - OutEvent = O, - Error = HE > + Send + 'static, ::OutboundOpenInfo: Send + 'static, { @@ -293,15 +292,9 @@ impl Manager { H: IntoConnectionHandler + Send + 'static, H::Handler: ConnectionHandler< Substream = Substream, - InEvent = I, - OutEvent = O, - Error = HE > + Send + 'static, ::OutboundOpenInfo: Send + 'static, TE: error::Error + Send + 'static, - HE: error::Error + Send + 'static, - I: Send + 'static, - O: Send + 'static, M: StreamMuxer + Send + Sync + 'static, M::OutboundSubstream: Send + 'static, { @@ -313,7 +306,7 @@ impl Manager { sender: tx, state: TaskState::Established(info) }); - let task: Pin>>, _, _, _, _, _>>> = + let task: Pin>>, _, _, _>>> = Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn)); if let Some(executor) = &mut self.executor { @@ -326,7 +319,7 @@ impl Manager { } /// Gets an entry for a managed connection, if it exists. - pub fn entry(&mut self, id: ConnectionId) -> Option> { + pub fn entry(&mut self, id: ConnectionId) -> Option>> { if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) { Some(Entry::new(task)) } else { @@ -340,7 +333,7 @@ impl Manager { } /// Polls the manager for events relating to the managed connections. - pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> { + pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> { // Advance the content of `local_spawns`. while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {} diff --git a/core/src/connection/manager/task.rs b/core/src/connection/manager/task.rs index c0f123af..a7bdbd3c 100644 --- a/core/src/connection/manager/task.rs +++ b/core/src/connection/manager/task.rs @@ -31,6 +31,11 @@ use crate::{ IntoConnectionHandler, PendingConnectionError, Substream, + handler::{ + THandlerInEvent, + THandlerOutEvent, + THandlerError, + }, }, }; use futures::{prelude::*, channel::mpsc, stream}; @@ -53,7 +58,7 @@ pub enum Command { /// Events that a task can emit to its manager. #[derive(Debug)] -pub enum Event { +pub enum Event { /// A connection to a node has succeeded. Established { id: TaskId, info: Connected }, /// A pending connection failed. @@ -61,15 +66,15 @@ pub enum Event { /// A node we are connected to has changed its address. AddressChange { id: TaskId, new_address: Multiaddr }, /// Notify the manager of an event from the connection. - Notify { id: TaskId, event: T }, + Notify { id: TaskId, event: THandlerOutEvent }, /// A connection closed, possibly due to an error. /// /// If `error` is `None`, the connection has completed /// an active orderly close. - Closed { id: TaskId, error: Option> } + Closed { id: TaskId, error: Option>> } } -impl Event { +impl Event { pub fn id(&self) -> &TaskId { match self { Event::Established { id, .. } => id, @@ -82,7 +87,7 @@ impl Event { } /// A `Task` is a [`Future`] that handles a single connection. -pub struct Task +pub struct Task where M: StreamMuxer, H: IntoConnectionHandler, @@ -92,16 +97,16 @@ where id: TaskId, /// Sender to emit events to the manager of this task. - events: mpsc::Sender::Error>>, + events: mpsc::Sender>, /// Receiver for commands sent by the manager of this task. - commands: stream::Fuse>>, + commands: stream::Fuse>>>, /// Inner state of this `Task`. - state: State, + state: State, } -impl Task +impl Task where M: StreamMuxer, H: IntoConnectionHandler, @@ -110,8 +115,8 @@ where /// Create a new task to connect and handle some node. pub fn pending( id: TaskId, - events: mpsc::Sender::Error>>, - commands: mpsc::Receiver>, + events: mpsc::Sender>, + commands: mpsc::Receiver>>, future: F, handler: H ) -> Self { @@ -129,8 +134,8 @@ where /// Create a task for an existing node we are already connected to. pub fn established( id: TaskId, - events: mpsc::Sender::Error>>, - commands: mpsc::Receiver>, + events: mpsc::Sender>, + commands: mpsc::Receiver>>, connection: Connection ) -> Self { Task { @@ -143,7 +148,7 @@ where } /// The state associated with the `Task` of a connection. -enum State +enum State where M: StreamMuxer, H: IntoConnectionHandler, @@ -165,20 +170,20 @@ where /// is polled for new events in this state, otherwise the event /// must be sent to the `Manager` before the connection can be /// polled again. - event: Option::Error>> + event: Option>, }, /// The connection is closing (active close). Closing(Close), /// The task is terminating with a final event for the `Manager`. - Terminating(Event::Error>), + Terminating(Event), /// The task has finished. Done } -impl Unpin for Task +impl Unpin for Task where M: StreamMuxer, H: IntoConnectionHandler, @@ -186,12 +191,14 @@ where { } -impl Future for Task +impl Future for Task where M: StreamMuxer, F: Future>, H: IntoConnectionHandler, - H::Handler: ConnectionHandler, InEvent = I, OutEvent = O> + H::Handler: ConnectionHandler< + Substream = Substream, + > + Send + 'static, { type Output = (); diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 225a3063..263c36a8 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -34,6 +34,11 @@ use crate::{ OutgoingInfo, Substream, PendingConnectionError, + handler::{ + THandlerInEvent, + THandlerOutEvent, + THandlerError, + }, manager::{self, Manager, ManagerConfig}, }, muxing::StreamMuxer, @@ -45,7 +50,7 @@ use smallvec::SmallVec; use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll}; /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool { +pub struct Pool { local_id: PeerId, /// The connection counter(s). @@ -55,7 +60,7 @@ pub struct Pool { /// established and pending connections. /// /// For every established connection there is a corresponding entry in `established`. - manager: Manager, + manager: Manager, /// The managed connections of each peer that are currently considered /// established, as witnessed by the associated `ConnectedPoint`. @@ -71,8 +76,8 @@ pub struct Pool { disconnected: Vec, } -impl fmt::Debug -for Pool +impl fmt::Debug +for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") @@ -81,14 +86,14 @@ for Pool } } -impl Unpin -for Pool {} +impl Unpin +for Pool {} /// Event that can happen on the `Pool`. -pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> { +pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTransErr> { /// A new connection has been established. ConnectionEstablished { - connection: EstablishedConnection<'a, TInEvent>, + connection: EstablishedConnection<'a, THandlerInEvent>, num_established: NonZeroU32, }, @@ -109,9 +114,9 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> { connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option>, + error: Option>>, /// A reference to the pool that used to manage the connection. - pool: &'a mut Pool, + pool: &'a mut Pool, /// The remaining number of established connections to the same peer. num_established: u32, }, @@ -130,21 +135,21 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> { /// The (expected) peer of the failed connection. peer: Option, /// A reference to the pool that managed the connection. - pool: &'a mut Pool, + pool: &'a mut Pool, }, /// A node has produced an event. ConnectionEvent { /// The connection that has generated the event. - connection: EstablishedConnection<'a, TInEvent>, + connection: EstablishedConnection<'a, THandlerInEvent>, /// The produced event. - event: TOutEvent, + event: THandlerOutEvent, }, /// The connection to a node has changed its address. AddressChange { /// The connection that has changed address. - connection: EstablishedConnection<'a, TInEvent>, + connection: EstablishedConnection<'a, THandlerInEvent>, /// The new endpoint. new_endpoint: ConnectedPoint, /// The old endpoint. @@ -152,13 +157,9 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> { }, } -impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug -for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> +impl<'a, THandler: IntoConnectionHandler, TTransErr> fmt::Debug for PoolEvent<'a, THandler, TTransErr> where - TOutEvent: fmt::Debug, TTransErr: fmt::Debug, - THandlerErr: fmt::Debug, - TInEvent: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match *self { @@ -197,8 +198,8 @@ where } } -impl - Pool +impl + Pool { /// Creates a new empty `Pool`. pub fn new( @@ -239,15 +240,9 @@ impl THandler: IntoConnectionHandler + Send + 'static, THandler::Handler: ConnectionHandler< Substream = Substream, - InEvent = TInEvent, - OutEvent = TOutEvent, - Error = THandlerErr > + Send + 'static, ::OutboundOpenInfo: Send + 'static, TTransErr: error::Error + Send + 'static, - THandlerErr: error::Error + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { @@ -274,15 +269,9 @@ impl THandler: IntoConnectionHandler + Send + 'static, THandler::Handler: ConnectionHandler< Substream = Substream, - InEvent = TInEvent, - OutEvent = TOutEvent, - Error = THandlerErr > + Send + 'static, ::OutboundOpenInfo: Send + 'static, TTransErr: error::Error + Send + 'static, - THandlerErr: error::Error + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { @@ -307,15 +296,9 @@ impl THandler: IntoConnectionHandler + Send + 'static, THandler::Handler: ConnectionHandler< Substream = Substream, - InEvent = TInEvent, - OutEvent = TOutEvent, - Error = THandlerErr > + Send + 'static, ::OutboundOpenInfo: Send + 'static, TTransErr: error::Error + Send + 'static, - THandlerErr: error::Error + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { @@ -360,15 +343,9 @@ impl THandler: IntoConnectionHandler + Send + 'static, THandler::Handler: ConnectionHandler< Substream = connection::Substream, - InEvent = TInEvent, - OutEvent = TOutEvent, - Error = THandlerErr > + Send + 'static, ::OutboundOpenInfo: Send + 'static, TTransErr: error::Error + Send + 'static, - THandlerErr: error::Error + Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { @@ -384,7 +361,7 @@ impl /// /// Returns `None` if the pool has no connection with the given ID. pub fn get(&mut self, id: ConnectionId) - -> Option> + -> Option>> { match self.manager.entry(id) { Some(manager::Entry::Established(entry)) => @@ -403,7 +380,7 @@ impl /// Gets an established connection from the pool by ID. pub fn get_established(&mut self, id: ConnectionId) - -> Option> + -> Option>> { match self.get(id) { Some(PoolConnection::Established(c)) => Some(c), @@ -413,7 +390,7 @@ impl /// Gets a pending outgoing connection by ID. pub fn get_outgoing(&mut self, id: ConnectionId) - -> Option> + -> Option>> { match self.pending.get(&id) { Some((ConnectedPoint::Dialer { .. }, _peer)) => @@ -494,11 +471,9 @@ impl pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId) -> EstablishedConnectionIter<'a, impl Iterator, - TInEvent, - TOutEvent, THandler, TTransErr, - THandlerErr> + > { let ids = self.iter_peer_established_info(peer) .map(|(id, _endpoint)| *id) @@ -563,7 +538,7 @@ impl /// > **Note**: We use a regular `poll` method instead of implementing `Stream`, /// > because we want the `Pool` to stay borrowed if necessary. pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll< - PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> + PoolEvent<'a, THandler, TTransErr> > { // Drain events resulting from forced disconnections. // @@ -828,22 +803,21 @@ impl EstablishedConnection<'_, TInEvent> { } /// An iterator over established connections in a pool. -pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> { - pool: &'a mut Pool, - ids: I +pub struct EstablishedConnectionIter<'a, I, THandler: IntoConnectionHandler, TTransErr> { + pool: &'a mut Pool, + ids: I, } // Note: Ideally this would be an implementation of `Iterator`, but that // requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and // a different definition of `Iterator`. -impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> - EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> +impl<'a, I, THandler: IntoConnectionHandler, TTransErr> EstablishedConnectionIter<'a, I, THandler, TTransErr> where I: Iterator { /// Obtains the next connection, if any. #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> Option> + pub fn next(&mut self) -> Option>> { while let Some(id) = self.ids.next() { if self.pool.manager.is_established(&id) { // (*) @@ -865,7 +839,7 @@ where /// Returns the first connection, if any, consuming the iterator. pub fn into_first<'b>(mut self) - -> Option> + -> Option>> where 'a: 'b { while let Some(id) = self.ids.next() { diff --git a/core/src/network.rs b/core/src/network.rs index 584e098f..c069171c 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -42,6 +42,10 @@ use crate::{ ListenersStream, PendingConnectionError, Substream, + handler::{ + THandlerInEvent, + THandlerOutEvent, + }, manager::ManagerConfig, pool::{Pool, PoolEvent}, }, @@ -62,7 +66,7 @@ use std::{ }; /// Implementation of `Stream` that handles the nodes. -pub struct Network +pub struct Network where TTrans: Transport, THandler: IntoConnectionHandler, @@ -74,8 +78,7 @@ where listeners: ListenersStream, /// The nodes currently active. - pool: Pool::Error>, + pool: Pool, /// The ongoing dialing attempts. /// @@ -92,8 +95,8 @@ where dialing: FnvHashMap>, } -impl fmt::Debug for - Network +impl fmt::Debug for + Network where TTrans: fmt::Debug + Transport, THandler: fmt::Debug + ConnectionHandler, @@ -108,16 +111,16 @@ where } } -impl Unpin for - Network +impl Unpin for + Network where TTrans: Transport, THandler: IntoConnectionHandler, { } -impl - Network +impl + Network where TTrans: Transport, THandler: IntoConnectionHandler, @@ -128,15 +131,15 @@ where } } -impl - Network +impl + Network where TTrans: Transport + Clone, TMuxer: StreamMuxer, THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, ::OutboundOpenInfo: Send, ::Error: error::Error + Send, + THandler::Handler: ConnectionHandler> + Send, { /// Creates a new node events stream. pub fn new( @@ -223,8 +226,6 @@ where TTrans::Dial: Send + 'static, TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, { // If the address ultimately encapsulates an expected peer ID, dial that peer // such that any mismatch is detected. We do not "pop off" the `P2p` protocol @@ -313,7 +314,7 @@ where /// Obtains a view of a [`Peer`] with the given ID in the network. pub fn peer(&mut self, peer_id: PeerId) - -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler> + -> Peer<'_, TTrans, THandler> { Peer::new(self, peer_id) } @@ -329,8 +330,6 @@ where handler: THandler, ) -> Result where - TInEvent: Send + 'static, - TOutEvent: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TTrans: Transport, @@ -347,7 +346,7 @@ where } /// Provides an API similar to `Stream`, except that it cannot error. - pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> + pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll, THandlerOutEvent, THandler>> where TTrans: Transport, TTrans::Error: Send + 'static, @@ -355,10 +354,7 @@ where TTrans::ListenerUpgrade: Send + 'static, TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, ::Error: error::Error + Send + 'static, { // Poll the listener(s) for new connections. @@ -455,8 +451,6 @@ where TTrans::Error: Send + 'static, TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, { dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts) } @@ -472,10 +466,9 @@ struct DialingOpts { } /// Standalone implementation of `Network::dial_peer` for more granular borrowing. -fn dial_peer_impl( +fn dial_peer_impl( transport: TTrans, - pool: &mut Pool::Error>, + pool: &mut Pool, dialing: &mut FnvHashMap>, opts: DialingOpts ) -> Result @@ -485,16 +478,12 @@ where ::OutboundOpenInfo: Send + 'static, THandler::Handler: ConnectionHandler< Substream = Substream, - InEvent = TInEvent, - OutEvent = TOutEvent, > + Send + 'static, TTrans: Transport, TTrans::Dial: Send + 'static, TTrans::Error: error::Error + Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, { // Ensure the address to dial encapsulates the `p2p` protocol for the // targeted peer, so that the transport has a "fully qualified" address @@ -531,13 +520,13 @@ where /// /// If the failed connection attempt was a dialing attempt and there /// are more addresses to try, new `DialingOpts` are returned. -fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>( +fn on_connection_failed<'a, TTrans, THandler>( dialing: &mut FnvHashMap>, id: ConnectionId, endpoint: ConnectedPoint, error: PendingConnectionError, handler: Option, -) -> (Option>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>) +) -> (Option>, NetworkEvent<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>) where TTrans: Transport, THandler: IntoConnectionHandler, diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index c62c937e..88c96aa0 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -34,6 +34,7 @@ use crate::{ IntoConnectionHandler, PendingConnection, Substream, + handler::THandlerInEvent, pool::Pool, }, PeerId @@ -53,30 +54,30 @@ use super::{Network, DialingOpts, DialError}; /// > **Note**: In any state there may always be a pending incoming /// > connection attempt from the peer, however, the remote identity /// > of a peer is only known once a connection is fully established. -pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler> +pub enum Peer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler { /// At least one established connection exists to the peer. - Connected(ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>), + Connected(ConnectedPeer<'a, TTrans, THandler>), /// There is an ongoing dialing (i.e. outgoing connection) attempt /// to the peer. There may already be other established connections /// to the peer. - Dialing(DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>), + Dialing(DialingPeer<'a, TTrans, THandler>), /// There exists no established connection to the peer and there is /// currently no ongoing dialing (i.e. outgoing connection) attempt /// in progress. - Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>), + Disconnected(DisconnectedPeer<'a, TTrans, THandler>), /// The peer represents the local node. Local, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for - Peer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> fmt::Debug for + Peer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -106,14 +107,14 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> - Peer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> + Peer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, { pub(super) fn new( - network: &'a mut Network, + network: &'a mut Network, peer_id: PeerId ) -> Self { if peer_id == network.local_peer_id { @@ -133,39 +134,37 @@ where fn disconnected( - network: &'a mut Network, + network: &'a mut Network, peer_id: PeerId ) -> Self { Peer::Disconnected(DisconnectedPeer { network, peer_id }) } fn connected( - network: &'a mut Network, + network: &'a mut Network, peer_id: PeerId ) -> Self { Peer::Connected(ConnectedPeer { network, peer_id }) } fn dialing( - network: &'a mut Network, + network: &'a mut Network, peer_id: PeerId ) -> Self { Peer::Dialing(DialingPeer { network, peer_id }) } } -impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler> - Peer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, TMuxer, THandler> + Peer<'a, TTrans, THandler> where TTrans: Transport + Clone, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, + THandler::Handler: ConnectionHandler> + Send, ::OutboundOpenInfo: Send, ::Error: error::Error + Send + 'static, { @@ -209,7 +208,7 @@ where /// attempt to the first address fails. pub fn dial(self, address: Multiaddr, remaining: I, handler: THandler) -> Result< - (ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>), + (ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError > where @@ -238,7 +237,7 @@ where /// /// Succeeds if the there is at least one established connection to the peer. pub fn into_connected(self) -> Option< - ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> + ConnectedPeer<'a, TTrans, THandler> > { match self { Peer::Connected(peer) => Some(peer), @@ -252,7 +251,7 @@ where /// /// Succeeds if the there is at least one pending outgoing connection to the peer. pub fn into_dialing(self) -> Option< - DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler> + DialingPeer<'a, TTrans, THandler> > { match self { Peer::Dialing(peer) => Some(peer), @@ -265,7 +264,7 @@ where /// Converts the peer into a `DisconnectedPeer`, if neither an established connection /// nor a dialing attempt exists. pub fn into_disconnected(self) -> Option< - DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> + DisconnectedPeer<'a, TTrans, THandler> > { match self { Peer::Disconnected(peer) => Some(peer), @@ -277,17 +276,17 @@ where /// The representation of a peer in a [`Network`] to whom at least /// one established connection exists. There may also be additional ongoing /// dialing attempts to the peer. -pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +pub struct ConnectedPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, { - network: &'a mut Network, + network: &'a mut Network, peer_id: PeerId, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> - ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> + ConnectedPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -297,13 +296,13 @@ where } /// Returns the `ConnectedPeer` into a `Peer`. - pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> { + pub fn into_peer(self) -> Peer<'a, TTrans, THandler> { Peer::Connected(self) } /// Obtains an established connection to the peer by ID. pub fn connection(&mut self, id: ConnectionId) - -> Option> + -> Option>> { self.network.pool.get_established(id) } @@ -323,7 +322,7 @@ where /// Converts this peer into a [`DialingPeer`], if there is an ongoing /// dialing attempt, `None` otherwise. pub fn into_dialing(self) -> Option< - DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler> + DialingPeer<'a, TTrans, THandler> > { if self.network.dialing.contains_key(&self.peer_id) { Some(DialingPeer { network: self.network, peer_id: self.peer_id }) @@ -336,18 +335,16 @@ where pub fn connections(&mut self) -> EstablishedConnectionIter< impl Iterator, - TInEvent, - TOutEvent, THandler, TTrans::Error, - ::Error> + > { self.network.pool.iter_peer_established(&self.peer_id) } /// Obtains some established connection to the peer. pub fn some_connection(&mut self) - -> EstablishedConnection + -> EstablishedConnection> { self.connections() .into_first() @@ -356,15 +353,15 @@ where /// Disconnects from the peer, closing all connections. pub fn disconnect(self) - -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> + -> DisconnectedPeer<'a, TTrans, THandler> { self.network.disconnect(&self.peer_id); DisconnectedPeer { network: self.network, peer_id: self.peer_id } } } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for - ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> fmt::Debug for + ConnectedPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -381,17 +378,17 @@ where /// The representation of a peer in a [`Network`] to whom a dialing /// attempt is ongoing. There may already exist other established /// connections to this peer. -pub struct DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +pub struct DialingPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, { - network: &'a mut Network, + network: &'a mut Network, peer_id: PeerId, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> - DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> + DialingPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -401,14 +398,14 @@ where } /// Returns the `DialingPeer` into a `Peer`. - pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> { + pub fn into_peer(self) -> Peer<'a, TTrans, THandler> { Peer::Dialing(self) } /// Disconnects from this peer, closing all established connections and /// aborting all dialing attempts. pub fn disconnect(self) - -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> + -> DisconnectedPeer<'a, TTrans, THandler> { self.network.disconnect(&self.peer_id); DisconnectedPeer { network: self.network, peer_id: self.peer_id } @@ -423,7 +420,7 @@ where /// Converts the peer into a `ConnectedPeer`, if an established connection exists. pub fn into_connected(self) - -> Option> + -> Option> { if self.is_connected() { Some(ConnectedPeer { peer_id: self.peer_id, network: self.network }) @@ -435,7 +432,7 @@ where /// Obtains a dialing attempt to the peer by connection ID of /// the current connection attempt. pub fn attempt(&mut self, id: ConnectionId) - -> Option> + -> Option>> { if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id) { if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) { @@ -448,13 +445,7 @@ where } /// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer. - pub fn attempts(&mut self) - -> DialingAttemptIter<'_, - TInEvent, - TOutEvent, - THandler, - TTrans::Error, - ::Error> + pub fn attempts(&mut self) -> DialingAttemptIter<'_, THandler, TTrans::Error> { DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing) } @@ -463,7 +454,7 @@ where /// /// At least one dialing connection is guaranteed to exist on a `DialingPeer`. pub fn some_attempt(&mut self) - -> DialingAttempt<'_, TInEvent> + -> DialingAttempt<'_, THandlerInEvent> { self.attempts() .into_first() @@ -471,8 +462,8 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for - DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> fmt::Debug for + DialingPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -489,17 +480,17 @@ where /// The representation of a peer to whom the `Network` has currently /// neither an established connection, nor an ongoing dialing attempt /// initiated by the local peer. -pub struct DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +pub struct DisconnectedPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, { peer_id: PeerId, - network: &'a mut Network, + network: &'a mut Network, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for - DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> fmt::Debug for + DisconnectedPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -511,8 +502,8 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, THandler> - DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler> +impl<'a, TTrans, THandler> + DisconnectedPeer<'a, TTrans, THandler> where TTrans: Transport, THandler: IntoConnectionHandler, @@ -522,7 +513,7 @@ where } /// Returns the `DisconnectedPeer` into a `Peer`. - pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> { + pub fn into_peer(self) -> Peer<'a, TTrans, THandler> { Peer::Disconnected(self) } @@ -539,14 +530,12 @@ where connected: Connected, connection: Connection, ) -> Result< - ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>, + ConnectedPeer<'a, TTrans, THandler>, ConnectionLimit > where - TInEvent: Send + 'static, - TOutEvent: Send + 'static, THandler: Send + 'static, TTrans::Error: Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, + THandler::Handler: ConnectionHandler> + Send, ::OutboundOpenInfo: Send, ::Error: error::Error + Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -631,11 +620,11 @@ impl<'a, TInEvent> } /// An iterator over the ongoing dialing attempts to a peer. -pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> { +pub struct DialingAttemptIter<'a, THandler: IntoConnectionHandler, TTransErr> { /// The peer whose dialing attempts are being iterated. peer_id: &'a PeerId, /// The underlying connection `Pool` of the `Network`. - pool: &'a mut Pool, + pool: &'a mut Pool, /// The state of all current dialing attempts known to the `Network`. /// /// Ownership of the `OccupiedEntry` for `peer_id` containing all attempts must be @@ -651,12 +640,12 @@ pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THan // Note: Ideally this would be an implementation of `Iterator`, but that // requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and // a different definition of `Iterator`. -impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> - DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> +impl<'a, THandler: IntoConnectionHandler, TTransErr> + DialingAttemptIter<'a, THandler, TTransErr> { fn new( peer_id: &'a PeerId, - pool: &'a mut Pool, + pool: &'a mut Pool, dialing: &'a mut FnvHashMap>, ) -> Self { let end = dialing.get(peer_id).map_or(0, |conns| conns.len()); @@ -665,7 +654,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> /// Obtains the next dialing connection, if any. #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> Option> { + pub fn next(&mut self) -> Option>> { // If the number of elements reduced, the current `DialingAttempt` has been // aborted and iteration needs to continue from the previous position to // account for the removed element. @@ -693,7 +682,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> /// Returns the first connection, if any, consuming the iterator. pub fn into_first<'b>(self) - -> Option> + -> Option>> where 'a: 'b { if self.pos == self.end { diff --git a/core/tests/util.rs b/core/tests/util.rs index c20a2c59..0437f908 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -23,7 +23,7 @@ use libp2p_noise as noise; use libp2p_tcp as tcp; use std::{io, pin::Pin, task::Context, task::Poll}; -type TestNetwork = Network; +type TestNetwork = Network; type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>; /// Creates a new `TestNetwork` with a TCP transport. diff --git a/misc/multistream-select/tests/transport.rs b/misc/multistream-select/tests/transport.rs index ad4a9e36..85f51834 100644 --- a/misc/multistream-select/tests/transport.rs +++ b/misc/multistream-select/tests/transport.rs @@ -38,7 +38,7 @@ use rand::random; use std::{io, task::{Context, Poll}}; type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>; -type TestNetwork = Network; +type TestNetwork = Network; fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) { let keys = identity::Keypair::generate_ed25519(); diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index d1a6e27b..b2916fa0 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -395,6 +395,7 @@ impl NetworkBehaviour for Floodsub { } /// Transmission between the `OneShotHandler` and the `FloodsubHandler`. +#[derive(Debug)] pub enum InnerMessage { /// We received an RPC from a remote. Rx(FloodsubRpc), diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 24f4189e..70b8fdd9 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -55,7 +55,7 @@ impl KademliaHandlerProto { } } -impl IntoProtocolsHandler for KademliaHandlerProto { +impl IntoProtocolsHandler for KademliaHandlerProto { type Handler = KademliaHandler; fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { @@ -457,7 +457,7 @@ impl KademliaHandler { impl ProtocolsHandler for KademliaHandler where - TUserData: Clone + Send + 'static, + TUserData: Clone + fmt::Debug + Send + 'static, { type InEvent = KademliaHandlerIn; type OutEvent = KademliaHandlerEvent; diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index e06ec87d..3e7deca2 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,6 +2,10 @@ - Update dependencies. +- Implement `Debug` for `RelayHandlerEvent` and `RelayHandlerIn`. See [PR 2183]. + +[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 + # 0.3.0 [2021-07-12] - Update dependencies. diff --git a/protocols/relay/src/handler.rs b/protocols/relay/src/handler.rs index e336b482..79aba915 100644 --- a/protocols/relay/src/handler.rs +++ b/protocols/relay/src/handler.rs @@ -33,6 +33,7 @@ use libp2p_swarm::{ ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use log::warn; +use std::fmt; use std::task::{Context, Poll}; use std::time::Duration; use wasm_timer::Instant; @@ -195,6 +196,55 @@ pub enum RelayHandlerEvent { }, } +impl fmt::Debug for RelayHandlerEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RelayHandlerEvent::IncomingRelayReq { + request_id, + src_addr, + req: _, + } => f + .debug_struct("RelayHandlerEvent::IncomingRelayReq") + .field("request_id", request_id) + .field("src_addr", src_addr) + .finish(), + RelayHandlerEvent::IncomingDstReq(_) => { + f.debug_tuple("RelayHandlerEvent::IncomingDstReq").finish() + } + RelayHandlerEvent::OutgoingRelayReqSuccess(peer_id, request_id, connection) => f + .debug_tuple("RelayHandlerEvent::OutgoingRelayReqSuccess") + .field(peer_id) + .field(request_id) + .field(connection) + .finish(), + RelayHandlerEvent::IncomingDstReqSuccess { + stream, + src_peer_id, + relay_peer_id, + relay_addr, + } => f + .debug_struct("RelayHandlerEvent::IncomingDstReqSuccess") + .field("stream", stream) + .field("src_peer_id", src_peer_id) + .field("relay_peer_id", relay_peer_id) + .field("relay_addr", relay_addr) + .finish(), + RelayHandlerEvent::OutgoingRelayReqError(peer_id, request_id) => f + .debug_tuple("RelayHandlerEvent::OutgoingRelayReqError") + .field(peer_id) + .field(request_id) + .finish(), + RelayHandlerEvent::OutgoingDstReqError { + src_connection_id, + incoming_relay_req_deny_fut: _, + } => f + .debug_struct("RelayHandlerEvent::OutgoingDstReqError") + .field("src_connection_id", src_connection_id) + .finish(), + } + } +} + /// Event that can be sent to the relay handler. pub enum RelayHandlerIn { /// Tell the handler whether it is handling a connection used to listen for incoming relayed @@ -233,6 +283,48 @@ pub enum RelayHandlerIn { }, } +impl fmt::Debug for RelayHandlerIn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RelayHandlerIn::UsedForListening(_) => { + f.debug_tuple("RelayHandlerIn::UsedForListening").finish() + } + RelayHandlerIn::DenyIncomingRelayReq(_) => f + .debug_tuple("RelayHandlerIn::DenyIncomingRelayReq") + .finish(), + RelayHandlerIn::AcceptDstReq(_) => { + f.debug_tuple("RelayHandlerIn::AcceptDstReq").finish() + } + RelayHandlerIn::DenyDstReq(_) => f.debug_tuple("RelayHandlerIn::DenyDstReq").finish(), + RelayHandlerIn::OutgoingRelayReq { + src_peer_id, + dst_peer_id, + request_id, + dst_addr, + } => f + .debug_struct("RelayHandlerIn::OutgoingRelayReq") + .field("src_peer_id", src_peer_id) + .field("dst_peer_id", dst_peer_id) + .field("request_id", request_id) + .field("dst_addr", dst_addr) + .finish(), + RelayHandlerIn::OutgoingDstReq { + src_peer_id, + src_addr, + src_connection_id, + request_id, + incoming_relay_req: _, + } => f + .debug_struct("RelayHandlerIn::OutgoingDstReq") + .field("src_peer_id", src_peer_id) + .field("src_addr", src_addr) + .field("src_connection_id", src_connection_id) + .field("request_id", request_id) + .finish(), + } + } +} + impl RelayHandler { /// Builds a new `RelayHandler`. pub fn new( diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 71188ec9..973fbe3f 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -2,6 +2,11 @@ - Update dependencies. +- Manually implement `Debug` for `RequestResponseHandlerEvent` and + `RequestProtocol`. See [PR 2183]. + +[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 + # 0.12.0 [2021-07-12] - Update dependencies. diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index b3f11888..ddb9f042 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -46,6 +46,7 @@ use libp2p_swarm::{ use smallvec::SmallVec; use std::{ collections::VecDeque, + fmt, io, sync::{atomic::{AtomicU64, Ordering}, Arc}, time::Duration, @@ -114,7 +115,6 @@ where /// The events emitted by the [`RequestResponseHandler`]. #[doc(hidden)] -#[derive(Debug)] pub enum RequestResponseHandlerEvent where TCodec: RequestResponseCodec @@ -147,6 +147,37 @@ where InboundUnsupportedProtocols(RequestId), } +impl fmt::Debug for RequestResponseHandlerEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RequestResponseHandlerEvent::Request { request_id, request: _, sender: _ } => f.debug_struct("RequestResponseHandlerEvent::Request") + .field("request_id", request_id) + .finish(), + RequestResponseHandlerEvent::Response { request_id, response: _ } => f.debug_struct("RequestResponseHandlerEvent::Response") + .field("request_id", request_id) + .finish(), + RequestResponseHandlerEvent::ResponseSent(request_id) => f.debug_tuple("RequestResponseHandlerEvent::ResponseSent") + .field(request_id) + .finish(), + RequestResponseHandlerEvent::ResponseOmission(request_id) => f.debug_tuple("RequestResponseHandlerEvent::ResponseOmission") + .field(request_id) + .finish(), + RequestResponseHandlerEvent::OutboundTimeout(request_id) => f.debug_tuple("RequestResponseHandlerEvent::OutboundTimeout") + .field(request_id) + .finish(), + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => f.debug_tuple("RequestResponseHandlerEvent::OutboundUnsupportedProtocols") + .field(request_id) + .finish(), + RequestResponseHandlerEvent::InboundTimeout(request_id) => f.debug_tuple("RequestResponseHandlerEvent::InboundTimeout") + .field(request_id) + .finish(), + RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => f.debug_tuple("RequestResponseHandlerEvent::InboundUnsupportedProtocols") + .field(request_id) + .finish(), + } + } +} + impl ProtocolsHandler for RequestResponseHandler where TCodec: RequestResponseCodec + Send + Clone + 'static, @@ -345,4 +376,3 @@ where Poll::Pending } } - diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 404fbd5c..cede827d 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -30,7 +30,7 @@ use futures::{channel::oneshot, future::BoxFuture, prelude::*}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use smallvec::SmallVec; -use std::io; +use std::{fmt, io}; /// The level of support for a particular protocol. #[derive(Debug, Clone)] @@ -127,7 +127,6 @@ where /// Request substream upgrade protocol. /// /// Sends a request and receives a response. -#[derive(Debug)] pub struct RequestProtocol where TCodec: RequestResponseCodec @@ -138,6 +137,17 @@ where pub(crate) request: TCodec::Request, } +impl fmt::Debug for RequestProtocol +where + TCodec: RequestResponseCodec, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RequestProtocol") + .field("request_id", &self.request_id) + .finish() + } +} + impl UpgradeInfo for RequestProtocol where TCodec: RequestResponseCodec diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index fd90965f..281f6ca3 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -10,8 +10,12 @@ trait parameters on `Swarm` (previously `ExpandedSwarm`), deriving parameters through associated types on `TBehaviour`. See [PR 2182]. +- Require `ProtocolsHandler::{InEvent,OutEvent,Error}` to implement `Debug` (see + [PR 2183]). + [PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150/ [PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182 +[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 # 0.30.0 [2021-07-12] diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index cecbe7ef..ae23b94b 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -100,6 +100,7 @@ use libp2p_core::{Executor, Multiaddr, Negotiated, PeerId, Transport, connection ConnectionLimit, ConnectedPoint, EstablishedConnection, + ConnectionHandler, IntoConnectionHandler, ListenerId, PendingConnectionError, @@ -278,8 +279,6 @@ where { network: Network< transport::Boxed<(PeerId, StreamMuxerBox)>, - THandlerInEvent, - THandlerOutEvent, NodeHandlerWrapperBuilder>, >, @@ -682,7 +681,7 @@ where } }, PendingNotifyHandler::Any(ids) => { - if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) { + if let Some((event, ids)) = notify_any::<_, _, TBehaviour>(ids, &mut peer, event, cx) { let handler = PendingNotifyHandler::Any(ids); this.pending_event = Some((peer_id, handler, event)); return Poll::Pending @@ -759,7 +758,7 @@ where } NotifyHandler::Any => { let ids = peer.connections().into_ids().collect(); - if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) { + if let Some((event, ids)) = notify_any::<_, _, TBehaviour>(ids, &mut peer, event, cx) { let handler = PendingNotifyHandler::Any(ids); this.pending_event = Some((peer_id, handler, event)); return Poll::Pending @@ -838,15 +837,17 @@ fn notify_one<'a, THandlerInEvent>( /// /// Returns `None` if either all connections are closing or the event /// was successfully sent to a handler, in either case the event is consumed. -fn notify_any<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>( +fn notify_any<'a, TTrans, THandler, TBehaviour>( ids: SmallVec<[ConnectionId; 10]>, - peer: &mut ConnectedPeer<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>, - event: THandlerInEvent, + peer: &mut ConnectedPeer<'a, TTrans, THandler>, + event: THandlerInEvent, cx: &mut Context<'_>, -) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> +) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)> where TTrans: Transport, + TBehaviour: NetworkBehaviour, THandler: IntoConnectionHandler, + THandler::Handler: ConnectionHandler, OutEvent = THandlerOutEvent> { let mut pending = SmallVec::new(); let mut event = Some(event); // (1) diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 42c61101..58ae3516 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -101,11 +101,11 @@ pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; /// continue reading data until the remote closes its side of the connection. pub trait ProtocolsHandler: Send + 'static { /// Custom event that can be received from the outside. - type InEvent: Send + 'static; + type InEvent: fmt::Debug + Send + 'static; /// Custom event that can be produced by the handler and that will be returned to the outside. - type OutEvent: Send + 'static; + type OutEvent: fmt::Debug + Send + 'static; /// The type of errors returned by [`ProtocolsHandler::poll`]. - type Error: error::Error + Send + 'static; + type Error: error::Error + fmt::Debug + Send + 'static; /// The inbound upgrade for the protocol(s) used by the handler. type InboundProtocol: InboundUpgradeSend; /// The outbound upgrade for the protocol(s) used by the handler. diff --git a/swarm/src/protocols_handler/map_in.rs b/swarm/src/protocols_handler/map_in.rs index 7c007db6..77ac5f91 100644 --- a/swarm/src/protocols_handler/map_in.rs +++ b/swarm/src/protocols_handler/map_in.rs @@ -27,7 +27,7 @@ use crate::protocols_handler::{ ProtocolsHandlerUpgrErr }; use libp2p_core::Multiaddr; -use std::{marker::PhantomData, task::Context, task::Poll}; +use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll}; /// Wrapper around a protocol handler that turns the input event into something else. pub struct MapInEvent { @@ -51,7 +51,7 @@ impl ProtocolsHandler for MapInEvent Option, - TNewIn: Send + 'static, + TNewIn: Debug + Send + 'static, TMap: Send + 'static, { type InEvent = TNewIn; diff --git a/swarm/src/protocols_handler/map_out.rs b/swarm/src/protocols_handler/map_out.rs index 292f0223..9df2ace9 100644 --- a/swarm/src/protocols_handler/map_out.rs +++ b/swarm/src/protocols_handler/map_out.rs @@ -27,6 +27,7 @@ use crate::protocols_handler::{ ProtocolsHandlerUpgrErr }; use libp2p_core::Multiaddr; +use std::fmt::Debug; use std::task::{Context, Poll}; /// Wrapper around a protocol handler that turns the output event into something else. @@ -49,7 +50,7 @@ impl ProtocolsHandler for MapOutEvent TNewOut, - TNewOut: Send + 'static, + TNewOut: Debug + Send + 'static, TMap: Send + 'static, { type InEvent = TProtoHandler::InEvent; diff --git a/swarm/src/protocols_handler/multi.rs b/swarm/src/protocols_handler/multi.rs index f23de96c..64821ca3 100644 --- a/swarm/src/protocols_handler/multi.rs +++ b/swarm/src/protocols_handler/multi.rs @@ -43,7 +43,7 @@ use std::{ cmp, collections::{HashMap, HashSet}, error, - fmt, + fmt::{self, Debug}, hash::Hash, iter::{self, FromIterator}, task::{Context, Poll}, @@ -88,7 +88,7 @@ where impl ProtocolsHandler for MultiHandler where - K: Clone + Hash + Eq + Send + 'static, + K: Clone + Debug + Hash + Eq + Send + 'static, H: ProtocolsHandler, H::InboundProtocol: InboundUpgradeSend, H::OutboundProtocol: OutboundUpgradeSend @@ -312,7 +312,7 @@ where impl IntoProtocolsHandler for IntoMultiHandler where - K: Clone + Eq + Hash + Send + 'static, + K: Debug + Clone + Eq + Hash + Send + 'static, H: IntoProtocolsHandler { type Handler = MultiHandler; diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 3baf779a..d19dd89d 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -28,7 +28,7 @@ use crate::protocols_handler::{ }; use smallvec::SmallVec; -use std::{error, task::Context, task::Poll, time::Duration}; +use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; /// A `ProtocolsHandler` that opens a new substream for each request. @@ -119,12 +119,12 @@ where impl ProtocolsHandler for OneShotHandler where TInbound: InboundUpgradeSend + Send + 'static, - TOutbound: OutboundUpgradeSend, + TOutbound: Debug + OutboundUpgradeSend, TInbound::Output: Into, TOutbound::Output: Into, TOutbound::Error: error::Error + Send + 'static, SubstreamProtocol: Clone, - TEvent: Send + 'static, + TEvent: Debug + Send + 'static, { type InEvent = TOutbound; type OutEvent = TEvent;