Remove ConnectionInfo trait (#1813)

In all cases, we pass the PeerId directly as the connection info.
The flexbility of doing something different here was originally
envisioned but turned out to be never needed.

For reference see: https://github.com/libp2p/rust-libp2p/issues/1798#issuecomment-714526056

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Thomas Eizinger 2020-10-31 01:51:27 +11:00 committed by GitHub
parent 421ae9d9c9
commit 335e55e60d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 555 additions and 589 deletions

View File

@ -23,6 +23,10 @@
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)
# Version 0.30.0 [unreleased]
- Update `libp2p-core` and all its dependers.
# Version 0.29.1 [2020-10-20]
- Update `libp2p-core`.

View File

@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.29.1"
version = "0.30.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -62,22 +62,22 @@ atomic = "0.5.0"
bytes = "0.5"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.23.1", path = "core" }
libp2p-core = { version = "0.24.0", path = "core" }
libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" }
libp2p-floodsub = { version = "0.23.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.23.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.23.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.24.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.23.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.25.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.23.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
libp2p-floodsub = { version = "0.24.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.24.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.24.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.25.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.24.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.26.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.24.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.24.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.4.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.23.0", path = "swarm" }
libp2p-uds = { version = "0.23.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.23.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.26.0", path = "muxers/yamux", optional = true }
libp2p-request-response = { version = "0.5.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.24.0", path = "swarm" }
libp2p-uds = { version = "0.24.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.24.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.27.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.9.3", path = "misc/multiaddr" }
multihash = "0.11.0"
parking_lot = "0.11.0"
@ -86,11 +86,11 @@ smallvec = "1.0"
wasm-timer = "0.2.4"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.23.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.23.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.23.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.23.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.24.0", path = "transports/websocket", optional = true }
libp2p-deflate = { version = "0.24.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.24.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.24.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.24.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.25.0", path = "transports/websocket", optional = true }
[dev-dependencies]
async-std = "1.6.2"

View File

@ -1,3 +1,9 @@
# 0.24.0 [unreleased]
- Remove `ConnectionInfo` trait and replace it with `PeerId`
everywhere. This was already effectively the case because
`ConnectionInfo` was implemented on `PeerId`.
# 0.23.1 [2020-10-20]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.23.1"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -159,37 +159,11 @@ impl ConnectedPoint {
/// Information about a successfully established connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected<I> {
pub struct Connected {
/// The connected endpoint, including network address information.
pub endpoint: ConnectedPoint,
/// Information obtained from the transport.
pub info: I,
}
impl<I> Connected<I>
where
I: ConnectionInfo
{
pub fn peer_id(&self) -> &I::PeerId {
self.info.peer_id()
}
}
/// Information about a connection.
pub trait ConnectionInfo {
/// Identity of the node we are connected to.
type PeerId: Eq + Hash;
/// Returns the identity of the node we are connected to on this connection.
fn peer_id(&self) -> &Self::PeerId;
}
impl ConnectionInfo for PeerId {
type PeerId = PeerId;
fn peer_id(&self) -> &PeerId {
self
}
pub peer_id: PeerId,
}
/// Event generated by a [`Connection`].
@ -334,12 +308,12 @@ impl<'a> IncomingInfo<'a> {
/// Borrowed information about an outgoing connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct OutgoingInfo<'a, TPeerId> {
pub struct OutgoingInfo<'a> {
pub address: &'a Multiaddr,
pub peer_id: Option<&'a TPeerId>,
pub peer_id: Option<&'a PeerId>,
}
impl<'a, TPeerId> OutgoingInfo<'a, TPeerId> {
impl<'a> OutgoingInfo<'a> {
/// Builds a `ConnectedPoint` corresponding to the outgoing connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Dialer {

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{Multiaddr, PeerId};
use crate::Multiaddr;
use std::{task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};
@ -69,7 +69,7 @@ pub trait ConnectionHandler {
}
/// Prototype for a `ConnectionHandler`.
pub trait IntoConnectionHandler<TConnInfo = PeerId> {
pub trait IntoConnectionHandler {
/// The node handler.
type Handler: ConnectionHandler;
@ -77,16 +77,16 @@ pub trait IntoConnectionHandler<TConnInfo = PeerId> {
///
/// The implementation is given a `Connected` value that holds information about
/// the newly established connection for which a handler should be created.
fn into_handler(self, connected: &Connected<TConnInfo>) -> Self::Handler;
fn into_handler(self, connected: &Connected) -> Self::Handler;
}
impl<T, TConnInfo> IntoConnectionHandler<TConnInfo> for T
impl<T> IntoConnectionHandler for T
where
T: ConnectionHandler
{
type Handler = Self;
fn into_handler(self, _: &Connected<TConnInfo>) -> Self {
fn into_handler(self, _: &Connected) -> Self {
self
}
}

View File

@ -70,7 +70,7 @@ mod task;
// manager.
/// The result of a pending connection attempt.
type ConnectResult<C, M, TE> = Result<(Connected<C>, M), PendingConnectionError<TE>>;
type ConnectResult<M, TE> = Result<(Connected, M), PendingConnectionError<TE>>;
/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
@ -88,7 +88,7 @@ impl ConnectionId {
}
/// A connection `Manager` orchestrates the I/O of a set of connections.
pub struct Manager<I, O, H, E, HE, C> {
pub struct Manager<I, O, H, E, HE> {
/// The tasks of the managed connections.
///
/// Each managed connection is associated with a (background) task
@ -96,7 +96,7 @@ pub struct Manager<I, O, H, E, HE, C> {
/// background task via a channel. Closing that channel (i.e. dropping
/// the sender in the associated `TaskInfo`) stops the background task,
/// which will attempt to gracefully close the connection.
tasks: FnvHashMap<TaskId, TaskInfo<I, C>>,
tasks: FnvHashMap<TaskId, TaskInfo<I>>,
/// Next available identifier for a new connection / task.
next_task_id: TaskId,
@ -115,15 +115,13 @@ pub struct Manager<I, O, H, E, HE, C> {
/// Sender distributed to managed tasks for reporting events back
/// to the manager.
events_tx: mpsc::Sender<task::Event<O, H, E, HE, C>>,
events_tx: mpsc::Sender<task::Event<O, H, E, HE>>,
/// Receiver for events reported from managed tasks.
events_rx: mpsc::Receiver<task::Event<O, H, E, HE, C>>
events_rx: mpsc::Receiver<task::Event<O, H, E, HE>>
}
impl<I, O, H, E, HE, C> fmt::Debug for Manager<I, O, H, E, HE, C>
where
C: fmt::Debug,
impl<I, O, H, E, HE> fmt::Debug for Manager<I, O, H, E, HE>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map()
@ -163,25 +161,25 @@ impl Default for ManagerConfig {
/// Contains the sender to deliver event messages to the task, and
/// the associated user data.
#[derive(Debug)]
struct TaskInfo<I, C> {
struct TaskInfo<I> {
/// Channel endpoint to send messages to the task.
sender: mpsc::Sender<task::Command<I>>,
/// The state of the task as seen by the `Manager`.
state: TaskState<C>,
state: TaskState,
}
/// Internal state of a running task as seen by the `Manager`.
#[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState<C> {
enum TaskState {
/// The connection is being established.
Pending,
/// The connection is established.
Established(Connected<C>),
Established(Connected),
}
/// Events produced by the [`Manager`].
#[derive(Debug)]
pub enum Event<'a, I, O, H, TE, HE, C> {
pub enum Event<'a, I, O, H, TE, HE> {
/// A connection attempt has failed.
PendingConnectionError {
/// The connection ID.
@ -205,7 +203,7 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
/// > the manager.
id: ConnectionId,
/// Information about the closed connection.
connected: Connected<C>,
connected: Connected,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
@ -214,13 +212,13 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
/// A connection has been established.
ConnectionEstablished {
/// The entry associated with the new connection.
entry: EstablishedEntry<'a, I, C>,
entry: EstablishedEntry<'a, I>,
},
/// A connection handler has produced an event.
ConnectionEvent {
/// The entry associated with the connection that produced the event.
entry: EstablishedEntry<'a, I, C>,
entry: EstablishedEntry<'a, I>,
/// The produced event.
event: O
},
@ -228,7 +226,7 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
/// A connection to a node has changed its address.
AddressChange {
/// The entry associated with the connection that changed address.
entry: EstablishedEntry<'a, I, C>,
entry: EstablishedEntry<'a, I>,
/// The former [`ConnectedPoint`].
old_endpoint: ConnectedPoint,
/// The new [`ConnectedPoint`].
@ -236,7 +234,7 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
},
}
impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
/// Creates a new connection manager.
pub fn new(config: ManagerConfig) -> Self {
let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
@ -261,11 +259,10 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
O: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
C: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
F: Future<Output = ConnectResult<C, M, TE>> + Send + 'static,
H: IntoConnectionHandler<C> + Send + 'static,
F: Future<Output = ConnectResult<M, TE>> + Send + 'static,
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
@ -291,9 +288,9 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
}
/// Adds an existing connection to the manager.
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected<C>) -> ConnectionId
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
where
H: IntoConnectionHandler<C> + Send + 'static,
H: IntoConnectionHandler + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
@ -307,7 +304,6 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
O: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
C: Send + 'static
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
@ -317,7 +313,7 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
sender: tx, state: TaskState::Established(info)
});
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _, _>>> =
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));
if let Some(executor) = &mut self.executor {
@ -330,7 +326,7 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
}
/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I, C>> {
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Some(Entry::new(task))
} else {
@ -347,7 +343,7 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
}
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}
@ -419,13 +415,13 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// An entry for a connection in the manager.
#[derive(Debug)]
pub enum Entry<'a, I, C> {
Pending(PendingEntry<'a, I, C>),
Established(EstablishedEntry<'a, I, C>)
pub enum Entry<'a, I> {
Pending(PendingEntry<'a, I>),
Established(EstablishedEntry<'a, I>)
}
impl<'a, I, C> Entry<'a, I, C> {
fn new(task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I, C>>) -> Self {
impl<'a, I> Entry<'a, I> {
fn new(task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>) -> Self {
match &task.get().state {
TaskState::Pending => Entry::Pending(PendingEntry { task }),
TaskState::Established(_) => Entry::Established(EstablishedEntry { task })
@ -435,11 +431,11 @@ impl<'a, I, C> Entry<'a, I, C> {
/// An entry for a managed connection that is considered established.
#[derive(Debug)]
pub struct EstablishedEntry<'a, I, C> {
task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I, C>>,
pub struct EstablishedEntry<'a, I> {
task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>,
}
impl<'a, I, C> EstablishedEntry<'a, I, C> {
impl<'a, I> EstablishedEntry<'a, I> {
/// (Asynchronously) sends an event to the connection handler.
///
/// If the handler is not ready to receive the event, either because
@ -490,7 +486,7 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
}
/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
pub fn connected(&self) -> &Connected {
match &self.task.get().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
@ -501,7 +497,7 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected<C> {
pub fn remove(self) -> Connected {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
@ -517,11 +513,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// An entry for a managed connection that is currently being established
/// (i.e. pending).
#[derive(Debug)]
pub struct PendingEntry<'a, I, C> {
task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I, C>>
pub struct PendingEntry<'a, I> {
task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>
}
impl<'a, I, C> PendingEntry<'a, I, C> {
impl<'a, I> PendingEntry<'a, I> {
/// Returns the connection id.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())

View File

@ -53,9 +53,9 @@ pub enum Command<T> {
/// Events that a task can emit to its manager.
#[derive(Debug)]
pub enum Event<T, H, TE, HE, C> {
pub enum Event<T, H, TE, HE> {
/// A connection to a node has succeeded.
Established { id: TaskId, info: Connected<C> },
Established { id: TaskId, info: Connected },
/// A pending connection failed.
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
/// A node we are connected to has changed its address.
@ -69,7 +69,7 @@ pub enum Event<T, H, TE, HE, C> {
Closed { id: TaskId, error: Option<ConnectionError<HE>> }
}
impl<T, H, TE, HE, C> Event<T, H, TE, HE, C> {
impl<T, H, TE, HE> Event<T, H, TE, HE> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
@ -82,35 +82,35 @@ impl<T, H, TE, HE, C> Event<T, H, TE, HE, C> {
}
/// A `Task` is a [`Future`] that handles a single connection.
pub struct Task<F, M, H, I, O, E, C>
pub struct Task<F, M, H, I, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
/// The ID of this task.
id: TaskId,
/// Sender to emit events to the manager of this task.
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
/// Receiver for commands sent by the manager of this task.
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
/// Inner state of this `Task`.
state: State<F, M, H, O, E, C>,
state: State<F, M, H, O, E>,
}
impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
impl<F, M, H, I, O, E> Task<F, M, H, I, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
/// Create a new task to connect and handle some node.
pub fn pending(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
future: F,
handler: H
@ -129,7 +129,7 @@ where
/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>,
commands: mpsc::Receiver<Command<I>>,
connection: Connection<M, H::Handler>
) -> Self {
@ -143,10 +143,10 @@ where
}
/// The state associated with the `Task` of a connection.
enum State<F, M, H, O, E, C>
enum State<F, M, H, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
/// The connection is being negotiated.
@ -165,32 +165,32 @@ where
/// is polled for new events in this state, otherwise the event
/// must be sent to the `Manager` before the connection can be
/// polled again.
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>
event: Option<Event<O, H, E, <H::Handler as ConnectionHandler>::Error>>
},
/// The connection is closing (active close).
Closing(Close<M>),
/// The task is terminating with a final event for the `Manager`.
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>),
Terminating(Event<O, H, E, <H::Handler as ConnectionHandler>::Error>),
/// The task has finished.
Done
}
impl<F, M, H, I, O, E, C> Unpin for Task<F, M, H, I, O, E, C>
impl<F, M, H, I, O, E> Unpin for Task<F, M, H, I, O, E>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
}
impl<F, M, H, I, O, E, C> Future for Task<F, M, H, I, O, E, C>
impl<F, M, H, I, O, E> Future for Task<F, M, H, I, O, E>
where
M: StreamMuxer,
F: Future<Output = ConnectResult<C, M, E>>,
H: IntoConnectionHandler<C>,
F: Future<Output = ConnectResult<M, E>>,
H: IntoConnectionHandler,
H::Handler: ConnectionHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
{
type Output = ();

View File

@ -29,7 +29,6 @@ use crate::{
ConnectionLimit,
ConnectionError,
ConnectionHandler,
ConnectionInfo,
IncomingInfo,
IntoConnectionHandler,
OutgoingInfo,
@ -43,11 +42,11 @@ use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{convert::TryFrom as _, error, fmt, hash::Hash, num::NonZeroU32, task::Context, task::Poll};
use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll};
/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId> {
local_id: TPeerId,
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
local_id: PeerId,
/// The configuration of the pool.
limits: PoolLimits,
@ -56,24 +55,24 @@ pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo
/// established and pending connections.
///
/// For every established connection there is a corresponding entry in `established`.
manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo>,
manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
/// The managed connections of each peer that are currently considered
/// established, as witnessed by the associated `ConnectedPoint`.
established: FnvHashMap<TPeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
established: FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
/// The pending connections that are currently being negotiated.
pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<TPeerId>)>,
pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
/// Established connections that have been closed in the context of
/// a [`Pool::disconnect`] in order to emit a `ConnectionClosed`
/// event for each. Every `ConnectionEstablished` event must be
/// paired with (eventually) a `ConnectionClosed`.
disconnected: Vec<Disconnected<TConnInfo>>,
disconnected: Vec<Disconnected>,
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// TODO: More useful debug impl?
@ -83,14 +82,14 @@ for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeer
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {}
/// Event that can happen on the `Pool`.
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
/// A new connection has been established.
ConnectionEstablished {
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
connection: EstablishedConnection<'a, TInEvent>,
num_established: NonZeroU32,
},
@ -108,12 +107,12 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
ConnectionClosed {
id: ConnectionId,
/// Information about the connection that errored.
connected: Connected<TConnInfo>,
connected: Connected,
/// The error that occurred, if any. If `None`, the connection
/// was closed by the local peer.
error: Option<ConnectionError<THandlerErr>>,
/// A reference to the pool that used to manage the connection.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
/// The remaining number of established connections to the same peer.
num_established: u32,
},
@ -130,15 +129,15 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
/// if the connection failed before the handler was consumed.
handler: Option<THandler>,
/// The (expected) peer of the failed connection.
peer: Option<TPeerId>,
peer: Option<PeerId>,
/// A reference to the pool that managed the connection.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
},
/// A node has produced an event.
ConnectionEvent {
/// The connection that has generated the event.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
connection: EstablishedConnection<'a, TInEvent>,
/// The produced event.
event: TOutEvent,
},
@ -146,7 +145,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
/// The connection to a node has changed its address.
AddressChange {
/// The connection that has changed address.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
connection: EstablishedConnection<'a, TInEvent>,
/// The new endpoint.
new_endpoint: ConnectedPoint,
/// The old endpoint.
@ -154,13 +153,12 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
},
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
where
TOutEvent: fmt::Debug,
TTransErr: fmt::Debug,
THandlerErr: fmt::Debug,
TConnInfo: fmt::Debug,
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
@ -200,14 +198,12 @@ where
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TPeerId: Eq + Hash,
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
/// Creates a new empty `Pool`.
pub fn new(
local_id: TPeerId,
local_id: PeerId,
manager_config: ManagerConfig,
limits: PoolLimits
) -> Self {
@ -238,11 +234,10 @@ where
info: IncomingInfo<'_>,
) -> Result<ConnectionId, ConnectionLimit>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
@ -256,7 +251,6 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
self.limits.check_incoming(|| self.iter_pending_incoming().count())?;
@ -272,14 +266,13 @@ where
&mut self,
future: TFut,
handler: THandler,
info: OutgoingInfo<'_, TPeerId>,
info: OutgoingInfo<'_>,
) -> Result<ConnectionId, ConnectionLimit>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
@ -293,7 +286,6 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
@ -312,14 +304,13 @@ where
future: TFut,
handler: THandler,
endpoint: ConnectedPoint,
peer: Option<TPeerId>,
peer: Option<PeerId>,
) -> ConnectionId
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
@ -333,7 +324,6 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
// Validate the received peer ID as the last step of the pending connection
// future, so that these errors can be raised before the `handler` is consumed
@ -343,18 +333,18 @@ where
let endpoint = endpoint.clone();
let expected_peer = peer.clone();
let local_id = self.local_id.clone();
move |(info, muxer)| {
move |(peer_id, muxer)| {
if let Some(peer) = expected_peer {
if &peer != info.peer_id() {
if peer != peer_id {
return future::err(PendingConnectionError::InvalidPeerId)
}
}
if &local_id == info.peer_id() {
if local_id == peer_id {
return future::err(PendingConnectionError::InvalidPeerId)
}
let connected = Connected { info, endpoint };
let connected = Connected { peer_id, endpoint };
future::ready(Ok((connected, muxer)))
}
});
@ -369,10 +359,10 @@ where
/// Returns the assigned connection ID on success. An error is returned
/// if the configured maximum number of established connections for the
/// connected peer has been reached.
pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected<TConnInfo>)
pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected)
-> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = connection::Substream<TMuxer>,
InEvent = TInEvent,
@ -386,18 +376,15 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
{
if let Some(limit) = self.limits.max_established_per_peer {
let current = self.num_peer_established(i.peer_id());
let current = self.num_peer_established(&i.peer_id);
if limit >= current {
return Err(ConnectionLimit { limit, current })
}
}
let id = self.manager.add(c, i.clone());
self.established.entry(i.peer_id().clone()).or_default().insert(id, i.endpoint);
self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint);
Ok(id)
}
@ -405,7 +392,7 @@ where
///
/// Returns `None` if the pool has no connection with the given ID.
pub fn get(&mut self, id: ConnectionId)
-> Option<PoolConnection<'_, TInEvent, TConnInfo, TPeerId>>
-> Option<PoolConnection<'_, TInEvent>>
{
match self.manager.entry(id) {
Some(manager::Entry::Established(entry)) =>
@ -423,7 +410,7 @@ where
/// Gets an established connection from the pool by ID.
pub fn get_established(&mut self, id: ConnectionId)
-> Option<EstablishedConnection<'_, TInEvent, TConnInfo>>
-> Option<EstablishedConnection<'_, TInEvent>>
{
match self.get(id) {
Some(PoolConnection::Established(c)) => Some(c),
@ -433,7 +420,7 @@ where
/// Gets a pending outgoing connection by ID.
pub fn get_outgoing(&mut self, id: ConnectionId)
-> Option<PendingConnection<'_, TInEvent, TConnInfo, TPeerId>>
-> Option<PendingConnection<'_, TInEvent>>
{
match self.pending.get(&id) {
Some((ConnectedPoint::Dialer { .. }, _peer)) =>
@ -452,7 +439,7 @@ where
/// Returns true if we are connected to the given peer.
///
/// This will return true only after a `NodeReached` event has been produced by `poll()`.
pub fn is_connected(&self, id: &TPeerId) -> bool {
pub fn is_connected(&self, id: &PeerId) -> bool {
self.established.contains_key(id)
}
@ -471,7 +458,7 @@ where
/// > **Note**: Established connections are dropped without performing
/// > an orderly close. See [`EstablishedConnection::start_close`] for
/// > performing such an orderly close.
pub fn disconnect(&mut self, peer: &TPeerId) {
pub fn disconnect(&mut self, peer: &PeerId) {
if let Some(conns) = self.established.get(peer) {
// Count upwards because we push to / pop from the end. See also `Pool::poll`.
let mut num_established = 0;
@ -518,28 +505,26 @@ where
}
/// Counts the number of established connections to the given peer.
pub fn num_peer_established(&self, peer: &TPeerId) -> usize {
pub fn num_peer_established(&self, peer: &PeerId) -> usize {
self.established.get(peer).map_or(0, |conns| conns.len())
}
/// Counts the number of pending outgoing connections to the given peer.
pub fn num_peer_outgoing(&self, peer: &TPeerId) -> usize {
pub fn num_peer_outgoing(&self, peer: &PeerId) -> usize {
self.iter_pending_outgoing()
.filter(|info| info.peer_id == Some(peer))
.count()
}
/// Returns an iterator over all established connections of `peer`.
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId)
-> EstablishedConnectionIter<'a,
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTransErr,
THandlerErr,
TConnInfo,
TPeerId>
THandlerErr>
{
let ids = self.iter_peer_established_info(peer)
.map(|(id, _endpoint)| *id)
@ -563,7 +548,7 @@ where
}
/// Returns an iterator for information on all pending outgoing connections.
pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_, TPeerId>> {
pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, ref peer_id)| {
match endpoint {
@ -576,7 +561,7 @@ where
/// Returns an iterator over all connection IDs and associated endpoints
/// of established connections to `peer` known to the pool.
pub fn iter_peer_established_info(&self, peer: &TPeerId)
pub fn iter_peer_established_info(&self, peer: &PeerId)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
{
match self.established.get(peer) {
@ -588,14 +573,14 @@ where
/// Returns an iterator over all pending connection IDs together
/// with associated endpoints and expected peer IDs in the pool.
pub fn iter_pending_info(&self)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<TPeerId>)> + '_
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<PeerId>)> + '_
{
self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
}
/// Returns an iterator over all connected peers, i.e. those that have
/// at least one established connection in the pool.
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a TPeerId> + 'a {
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
self.established.keys()
}
@ -604,11 +589,8 @@ where
/// > **Note**: We use a regular `poll` method instead of implementing `Stream`,
/// > because we want the `Pool` to stay borrowed if necessary.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
> where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone,
TPeerId: Clone
{
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
> {
// Drain events resulting from forced disconnections.
//
// Note: The `Disconnected` entries in `self.disconnected`
@ -650,14 +632,14 @@ where
},
manager::Event::ConnectionClosed { id, connected, error } => {
let num_established =
if let Some(conns) = self.established.get_mut(connected.peer_id()) {
if let Some(conns) = self.established.get_mut(&connected.peer_id) {
conns.remove(&id);
u32::try_from(conns.len()).unwrap()
} else {
0
};
if num_established == 0 {
self.established.remove(connected.peer_id());
self.established.remove(&connected.peer_id);
}
return Poll::Ready(PoolEvent::ConnectionClosed {
id, connected, error, num_established, pool: self
@ -668,7 +650,7 @@ where
if let Some((endpoint, peer)) = self.pending.remove(&id) {
// Check connection limit.
let established = &self.established;
let current = || established.get(entry.connected().peer_id())
let current = || established.get(&entry.connected().peer_id)
.map_or(0, |conns| conns.len());
if let Err(e) = self.limits.check_established(current) {
let connected = entry.remove();
@ -683,17 +665,17 @@ where
}
// Peer ID checks must already have happened. See `add_pending`.
if cfg!(debug_assertions) {
if &self.local_id == entry.connected().peer_id() {
if self.local_id == entry.connected().peer_id {
panic!("Unexpected local peer ID for remote.");
}
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
if peer != entry.connected().peer_id {
panic!("Unexpected peer ID mismatch.");
}
}
}
// Add the connection to the pool.
let peer = entry.connected().peer_id().clone();
let peer = entry.connected().peer_id.clone();
let conns = self.established.entry(peer).or_default();
let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
.expect("n + 1 is always non-zero; qed");
@ -721,7 +703,7 @@ where
manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
let id = entry.id();
match self.established.get_mut(entry.connected().peer_id()) {
match self.established.get_mut(&entry.connected().peer_id) {
Some(list) => *list.get_mut(&id)
.expect("state inconsistency: entry is `EstablishedEntry` but absent \
from `established`") = new_endpoint.clone(),
@ -745,19 +727,19 @@ where
}
/// A connection in a [`Pool`].
pub enum PoolConnection<'a, TInEvent, TConnInfo, TPeerId> {
Pending(PendingConnection<'a, TInEvent, TConnInfo, TPeerId>),
Established(EstablishedConnection<'a, TInEvent, TConnInfo>),
pub enum PoolConnection<'a, TInEvent> {
Pending(PendingConnection<'a, TInEvent>),
Established(EstablishedConnection<'a, TInEvent>),
}
/// A pending connection in a pool.
pub struct PendingConnection<'a, TInEvent, TConnInfo, TPeerId> {
entry: manager::PendingEntry<'a, TInEvent, TConnInfo>,
pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<TPeerId>)>,
pub struct PendingConnection<'a, TInEvent> {
entry: manager::PendingEntry<'a, TInEvent>,
pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
}
impl<TInEvent, TConnInfo, TPeerId>
PendingConnection<'_, TInEvent, TConnInfo, TPeerId>
impl<TInEvent>
PendingConnection<'_, TInEvent>
{
/// Returns the local connection ID.
pub fn id(&self) -> ConnectionId {
@ -765,7 +747,7 @@ impl<TInEvent, TConnInfo, TPeerId>
}
/// Returns the (expected) identity of the remote peer, if known.
pub fn peer_id(&self) -> &Option<TPeerId> {
pub fn peer_id(&self) -> &Option<PeerId> {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
}
@ -782,15 +764,14 @@ impl<TInEvent, TConnInfo, TPeerId>
}
/// An established connection in a pool.
pub struct EstablishedConnection<'a, TInEvent, TConnInfo> {
entry: manager::EstablishedEntry<'a, TInEvent, TConnInfo>,
pub struct EstablishedConnection<'a, TInEvent> {
entry: manager::EstablishedEntry<'a, TInEvent>,
}
impl<TInEvent, TConnInfo> fmt::Debug
for EstablishedConnection<'_, TInEvent, TConnInfo>
impl<TInEvent> fmt::Debug
for EstablishedConnection<'_, TInEvent>
where
TInEvent: fmt::Debug,
TConnInfo: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("EstablishedConnection")
@ -799,8 +780,8 @@ where
}
}
impl<TInEvent, TConnInfo> EstablishedConnection<'_, TInEvent, TConnInfo> {
pub fn connected(&self) -> &Connected<TConnInfo> {
impl<TInEvent> EstablishedConnection<'_, TInEvent> {
pub fn connected(&self) -> &Connected {
self.entry.connected()
}
@ -810,14 +791,12 @@ impl<TInEvent, TConnInfo> EstablishedConnection<'_, TInEvent, TConnInfo> {
}
/// Returns connection information obtained from the transport.
pub fn info(&self) -> &TConnInfo {
&self.entry.connected().info
pub fn info(&self) -> &PeerId {
&self.entry.connected().peer_id
}
}
impl<'a, TInEvent, TConnInfo> EstablishedConnection<'a, TInEvent, TConnInfo>
where
TConnInfo: ConnectionInfo,
impl<'a, TInEvent> EstablishedConnection<'a, TInEvent>
{
/// Returns the local connection ID.
pub fn id(&self) -> ConnectionId {
@ -825,8 +804,8 @@ where
}
/// Returns the identity of the connected peer.
pub fn peer_id(&self) -> &TConnInfo::PeerId {
self.info().peer_id()
pub fn peer_id(&self) -> &PeerId {
self.info()
}
/// (Asynchronously) sends an event to the connection handler.
@ -862,21 +841,21 @@ where
}
/// An iterator over established connections in a pool.
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
ids: I
}
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
where
I: Iterator<Item = ConnectionId>
{
/// Obtains the next connection, if any.
pub fn next<'b>(&'b mut self) -> Option<EstablishedConnection<'b, TInEvent, TConnInfo>>
pub fn next<'b>(&'b mut self) -> Option<EstablishedConnection<'b, TInEvent>>
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) { // (*)
@ -898,7 +877,7 @@ where
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(mut self)
-> Option<EstablishedConnection<'b, TInEvent, TConnInfo>>
-> Option<EstablishedConnection<'b, TInEvent>>
where 'a: 'b
{
while let Some(id) = self.ids.next() {
@ -969,9 +948,9 @@ impl PoolLimits {
/// Information about a former established connection to a peer
/// that was dropped via [`Pool::disconnect`].
struct Disconnected<TConnInfo> {
struct Disconnected {
id: ConnectionId,
connected: Connected<TConnInfo>,
connected: Connected,
/// The remaining number of established connections
/// to the same peer.
num_established: u32,

View File

@ -61,7 +61,7 @@ pub use identity::PublicKey;
pub use transport::Transport;
pub use translation::address_translation;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
pub use connection::{Connected, Endpoint, ConnectedPoint, ConnectionInfo};
pub use connection::{Connected, Endpoint, ConnectedPoint};
pub use network::Network;
use std::{future::Future, pin::Pin};

View File

@ -34,7 +34,6 @@ use crate::{
ConnectionId,
ConnectionLimit,
ConnectionHandler,
ConnectionInfo,
IntoConnectionHandler,
IncomingInfo,
OutgoingInfo,
@ -57,27 +56,26 @@ use std::{
convert::TryFrom as _,
error,
fmt,
hash::Hash,
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
};
/// Implementation of `Stream` that handles the nodes.
pub struct Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo = PeerId, TPeerId = PeerId>
pub struct Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
/// The local peer ID.
local_peer_id: TPeerId,
local_peer_id: PeerId,
/// Listeners for incoming connections.
listeners: ListenersStream<TTrans>,
/// The nodes currently active.
pool: Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
<THandler::Handler as ConnectionHandler>::Error>,
/// The ongoing dialing attempts.
///
@ -91,16 +89,14 @@ where
/// > the same connection ID. This is ensured by the implementation of
/// > `Network` (see `dial_peer_impl` and `on_connection_failed`)
/// > together with the implementation of `DialingAttempt::abort`.
dialing: FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
dialing: FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
}
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: fmt::Debug + Transport,
THandler: fmt::Debug + ConnectionHandler,
TConnInfo: fmt::Debug,
TPeerId: fmt::Debug + Eq + Hash,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ReachAttempts")
@ -112,44 +108,40 @@ where
}
}
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> Unpin for
Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<TTrans, TInEvent, TOutEvent, THandler> Unpin for
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
}
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<TTrans, TInEvent, TOutEvent, THandler>
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
THandler: IntoConnectionHandler,
{
fn disconnect(&mut self, peer: &TPeerId) {
fn disconnect(&mut self, peer: &PeerId) {
self.pool.disconnect(peer);
self.dialing.remove(peer);
}
}
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler, TConnInfo, TPeerId>
Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler>
Network<TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport + Clone,
TMuxer: StreamMuxer,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone,
{
/// Creates a new node events stream.
pub fn new(
transport: TTrans,
local_peer_id: TPeerId,
local_peer_id: PeerId,
config: NetworkConfig,
) -> Self {
let pool_local_id = local_peer_id.clone();
@ -217,7 +209,7 @@ where
}
/// Returns the peer id of the local node.
pub fn local_peer_id(&self) -> &TPeerId {
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
@ -229,15 +221,13 @@ where
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, ConnectionLimit>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: Send + 'static,
TPeerId: Send + 'static,
{
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
@ -285,29 +275,29 @@ where
/// Returns a list of all connected peers, i.e. peers to whom the `Network`
/// has at least one established connection.
pub fn connected_peers(&self) -> impl Iterator<Item = &TPeerId> {
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool.iter_connected()
}
/// Checks whether the network has an established connection to a peer.
pub fn is_connected(&self, peer: &TPeerId) -> bool {
pub fn is_connected(&self, peer: &PeerId) -> bool {
self.pool.is_connected(peer)
}
/// Checks whether the network has an ongoing dialing attempt to a peer.
pub fn is_dialing(&self, peer: &TPeerId) -> bool {
pub fn is_dialing(&self, peer: &PeerId) -> bool {
self.dialing.contains_key(peer)
}
/// Checks whether the network has neither an ongoing dialing attempt,
/// nor an established connection to a peer.
pub fn is_disconnected(&self, peer: &TPeerId) -> bool {
pub fn is_disconnected(&self, peer: &PeerId) -> bool {
!self.is_connected(peer) && !self.is_dialing(peer)
}
/// Returns a list of all the peers to whom a new outgoing connection
/// is currently being established.
pub fn dialing_peers(&self) -> impl Iterator<Item = &TPeerId> {
pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
self.dialing.keys()
}
@ -328,8 +318,8 @@ where
}
/// Obtains a view of a [`Peer`] with the given ID in the network.
pub fn peer(&mut self, peer_id: TPeerId)
-> Peer<'_, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
pub fn peer(&mut self, peer_id: PeerId)
-> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
{
Peer::new(self, peer_id)
}
@ -347,10 +337,9 @@ where
where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TPeerId: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
{
@ -364,9 +353,9 @@ where
}
/// Provides an API similar to `Stream`, except that it cannot error.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>>
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
@ -374,11 +363,9 @@ where
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: Clone,
TPeerId: Send + 'static,
{
// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
@ -469,17 +456,16 @@ where
}
/// Initiates a connection attempt to a known peer.
fn dial_peer(&mut self, opts: DialingOpts<TPeerId, THandler>)
fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
-> Result<ConnectionId, ConnectionLimit>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TPeerId: Send + 'static,
{
dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
}
@ -487,23 +473,23 @@ where
/// Options for a dialing attempt (i.e. repeated connection attempt
/// via a list of address) to a peer.
struct DialingOpts<TPeerId, THandler> {
peer: TPeerId,
struct DialingOpts<PeerId, THandler> {
peer: PeerId,
handler: THandler,
address: Multiaddr,
remaining: Vec<Multiaddr>,
}
/// Standalone implementation of `Network::dial_peer` for more granular borrowing.
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans, TConnInfo, TPeerId>(
fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
transport: TTrans,
pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<TPeerId, THandler>
<THandler::Handler as ConnectionHandler>::Error>,
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
opts: DialingOpts<PeerId, THandler>
) -> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
THandler::Handler: ConnectionHandler<
@ -511,15 +497,13 @@ where
InEvent = TInEvent,
OutEvent = TOutEvent,
> + Send + 'static,
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TPeerId: Eq + Hash + Send + Clone + 'static,
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
Ok(fut) => {
@ -551,18 +535,16 @@ where
///
/// If the failed connection attempt was a dialing attempt and there
/// are more addresses to try, new `DialingOpts` are returned.
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
dialing: &mut FnvHashMap<TPeerId, SmallVec<[peer::DialingState; 10]>>,
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>(
dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
id: ConnectionId,
endpoint: ConnectedPoint,
error: PendingConnectionError<TTrans::Error>,
handler: Option<THandler>,
) -> (Option<DialingOpts<TPeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>)
) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>)
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone,
THandler: IntoConnectionHandler,
{
// Check if the failed connection is associated with a dialing attempt.
let dialing_failed = dialing.iter_mut()

View File

@ -34,14 +34,15 @@ use crate::{
PendingConnectionError,
},
transport::Transport,
PeerId
};
use std::{fmt, num::NonZeroU32};
/// Event that can happen on the `Network`.
pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
/// One of the listeners gracefully closed.
ListenerClosed {
@ -104,7 +105,7 @@ where
/// A new connection to a peer has been established.
ConnectionEstablished {
/// The newly established connection.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
connection: EstablishedConnection<'a, TInEvent>,
/// The total number of established connections to the same peer,
/// including the one that has just been opened.
num_established: NonZeroU32,
@ -126,7 +127,7 @@ where
/// The ID of the connection that encountered an error.
id: ConnectionId,
/// Information about the connection that encountered the error.
connected: Connected<TConnInfo>,
connected: Connected,
/// The error that occurred.
error: Option<ConnectionError<<THandler::Handler as ConnectionHandler>::Error>>,
/// The remaining number of established connections to the same peer.
@ -139,7 +140,7 @@ where
attempts_remaining: u32,
/// Id of the peer we were trying to dial.
peer_id: TPeerId,
peer_id: PeerId,
/// The multiaddr we failed to reach.
multiaddr: Multiaddr,
@ -160,7 +161,7 @@ where
/// An established connection produced an event.
ConnectionEvent {
/// The connection on which the event occurred.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
connection: EstablishedConnection<'a, TInEvent>,
/// Event that was produced by the node.
event: TOutEvent,
},
@ -168,7 +169,7 @@ where
/// An established connection has changed its address.
AddressChange {
/// The connection whose address has changed.
connection: EstablishedConnection<'a, TInEvent, TConnInfo>,
connection: EstablishedConnection<'a, TInEvent>,
/// New endpoint of this connection.
new_endpoint: ConnectedPoint,
/// Old endpoint of this connection.
@ -176,17 +177,15 @@ where
},
}
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
NetworkEvent<'_, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
NetworkEvent<'_, TTrans, TInEvent, TOutEvent, THandler>
where
TInEvent: fmt::Debug,
TOutEvent: fmt::Debug,
TTrans: Transport,
TTrans::Error: fmt::Debug,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
<THandler::Handler as ConnectionHandler>::Error: fmt::Debug,
TConnInfo: fmt::Debug,
TPeerId: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {

View File

@ -26,7 +26,6 @@ use crate::{
Connected,
ConnectedPoint,
ConnectionHandler,
ConnectionInfo,
Connection,
ConnectionId,
ConnectionLimit,
@ -37,6 +36,7 @@ use crate::{
Substream,
pool::Pool,
},
PeerId
};
use fnv::FnvHashMap;
use smallvec::SmallVec;
@ -44,7 +44,6 @@ use std::{
collections::hash_map,
error,
fmt,
hash::Hash,
};
use super::{Network, DialingOpts};
@ -54,35 +53,33 @@ use super::{Network, DialingOpts};
/// > **Note**: In any state there may always be a pending incoming
/// > connection attempt from the peer, however, the remote identity
/// > of a peer is only known once a connection is fully established.
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>
THandler: IntoConnectionHandler
{
/// At least one established connection exists to the peer.
Connected(ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
Connected(ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
/// There is an ongoing dialing (i.e. outgoing connection) attempt
/// to the peer. There may already be other established connections
/// to the peer.
Dialing(DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
Dialing(DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
/// There exists no established connection to the peer and there is
/// currently no ongoing dialing (i.e. outgoing connection) attempt
/// in progress.
Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
/// The peer represents the local node.
Local,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId>,
TPeerId: fmt::Debug + Eq + Hash,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
@ -109,17 +106,15 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
THandler: IntoConnectionHandler,
{
pub(super) fn new(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
peer_id: PeerId
) -> Self {
if peer_id == network.local_peer_id {
return Peer::Local;
@ -138,43 +133,41 @@ where
fn disconnected(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
peer_id: PeerId
) -> Self {
Peer::Disconnected(DisconnectedPeer { network, peer_id })
}
fn connected(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
peer_id: PeerId
) -> Self {
Peer::Connected(ConnectedPeer { network, peer_id })
}
fn dialing(
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
peer_id: PeerId
) -> Self {
Peer::Dialing(DialingPeer { network, peer_id })
}
}
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
/// Checks whether the peer is currently connected.
///
@ -219,7 +212,7 @@ where
/// attempt to the first address fails.
pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
-> Result<
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>),
(ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
ConnectionLimit
>
where
@ -246,7 +239,7 @@ where
///
/// Succeeds if the there is at least one established connection to the peer.
pub fn into_connected(self) -> Option<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
> {
match self {
Peer::Connected(peer) => Some(peer),
@ -260,7 +253,7 @@ where
///
/// Succeeds if the there is at least one pending outgoing connection to the peer.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
> {
match self {
Peer::Dialing(peer) => Some(peer),
@ -273,7 +266,7 @@ where
/// Converts the peer into a `DisconnectedPeer`, if neither an established connection
/// nor a dialing attempt exists.
pub fn into_disconnected(self) -> Option<
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
> {
match self {
Peer::Disconnected(peer) => Some(peer),
@ -285,35 +278,33 @@ where
/// The representation of a peer in a [`Network`] to whom at least
/// one established connection exists. There may also be additional ongoing
/// dialing attempts to the peer.
pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId,
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
peer_id: PeerId,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
THandler: IntoConnectionHandler,
{
pub fn id(&self) -> &TPeerId {
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the `ConnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
Peer::Connected(self)
}
/// Obtains an established connection to the peer by ID.
pub fn connection<'b>(&'b mut self, id: ConnectionId)
-> Option<EstablishedConnection<'b, TInEvent, TConnInfo>>
-> Option<EstablishedConnection<'b, TInEvent>>
{
self.network.pool.get_established(id)
}
@ -333,7 +324,7 @@ where
/// Converts this peer into a [`DialingPeer`], if there is an ongoing
/// dialing attempt, `None` otherwise.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
> {
if self.network.dialing.contains_key(&self.peer_id) {
Some(DialingPeer { network: self.network, peer_id: self.peer_id })
@ -350,16 +341,14 @@ where
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error,
TConnInfo,
TPeerId>
<THandler::Handler as ConnectionHandler>::Error>
{
self.network.pool.iter_peer_established(&self.peer_id)
}
/// Obtains some established connection to the peer.
pub fn some_connection<'b>(&'b mut self)
-> EstablishedConnection<'b, TInEvent, TConnInfo>
-> EstablishedConnection<'b, TInEvent>
{
self.connections()
.into_first()
@ -368,19 +357,18 @@ where
/// Disconnects from the peer, closing all connections.
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + fmt::Debug,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ConnectedPeer")
@ -394,36 +382,34 @@ where
/// The representation of a peer in a [`Network`] to whom a dialing
/// attempt is ongoing. There may already exist other established
/// connections to this peer.
pub struct DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
pub struct DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: TPeerId,
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
peer_id: PeerId,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
THandler: IntoConnectionHandler,
{
pub fn id(&self) -> &TPeerId {
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the `DialingPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
Peer::Dialing(self)
}
/// Disconnects from this peer, closing all established connections and
/// aborting all dialing attempts.
pub fn disconnect(self)
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
-> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
{
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
@ -438,7 +424,7 @@ where
/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
pub fn into_connected(self)
-> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>>
-> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>>
{
if self.is_connected() {
Some(ConnectedPeer { peer_id: self.peer_id, network: self.network })
@ -450,7 +436,7 @@ where
/// Obtains a dialing attempt to the peer by connection ID of
/// the current connection attempt.
pub fn attempt<'b>(&'b mut self, id: ConnectionId)
-> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>>
-> Option<DialingAttempt<'b, TInEvent>>
{
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id.clone()) {
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
@ -475,9 +461,7 @@ where
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error,
TConnInfo,
TPeerId>
<THandler::Handler as ConnectionHandler>::Error>
{
DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing)
}
@ -486,7 +470,7 @@ where
///
/// At least one dialing connection is guaranteed to exist on a `DialingPeer`.
pub fn some_attempt<'b>(&'b mut self)
-> DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>
-> DialingAttempt<'b, TInEvent>
{
self.attempts()
.into_first()
@ -494,12 +478,11 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + fmt::Debug,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DialingPeer")
@ -513,21 +496,20 @@ where
/// The representation of a peer to whom the `Network` has currently
/// neither an established connection, nor an ongoing dialing attempt
/// initiated by the local peer.
pub struct DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
pub struct DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
peer_id: TPeerId,
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer_id: PeerId,
network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> fmt::Debug for
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: fmt::Debug,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DisconnectedPeer")
@ -536,18 +518,18 @@ where
}
}
impl<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
impl<'a, TTrans, TInEvent, TOutEvent, THandler>
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
THandler: IntoConnectionHandler,
{
pub fn id(&self) -> &TPeerId {
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the `DisconnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
Peer::Disconnected(self)
}
@ -558,13 +540,13 @@ where
///
/// # Panics
///
/// Panics if `connected.peer_id()` does not identify the current peer.
/// Panics if `connected.peer_id` does not identify the current peer.
pub fn set_connected<TMuxer>(
self,
connected: Connected<TConnInfo>,
connected: Connected,
connection: Connection<TMuxer, THandler::Handler>,
) -> Result<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
ConnectionLimit
> where
TInEvent: Send + 'static,
@ -574,13 +556,11 @@ where
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + fmt::Debug + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
{
if connected.peer_id() != &self.peer_id {
panic!("Invalid peer ID given: {:?}. Expected: {:?}", connected.peer_id(), self.peer_id)
if connected.peer_id != self.peer_id {
panic!("Invalid peer ID given: {:?}. Expected: {:?}", connected.peer_id, self.peer_id)
}
self.network.pool.add(connection, connected)
@ -604,17 +584,17 @@ pub(super) struct DialingState {
/// A `DialingAttempt` is an ongoing outgoing connection attempt to
/// a known / expected remote peer ID and a list of alternative addresses
/// to connect to, if the current connection attempt fails.
pub struct DialingAttempt<'a, TInEvent, TConnInfo, TPeerId> {
pub struct DialingAttempt<'a, TInEvent> {
/// The underlying pending connection in the `Pool`.
inner: PendingConnection<'a, TInEvent, TConnInfo, TPeerId>,
inner: PendingConnection<'a, TInEvent>,
/// All current dialing attempts of the peer.
attempts: hash_map::OccupiedEntry<'a, TPeerId, SmallVec<[DialingState; 10]>>,
attempts: hash_map::OccupiedEntry<'a, PeerId, SmallVec<[DialingState; 10]>>,
/// The position of the current `DialingState` of this connection in the `attempts`.
pos: usize,
}
impl<'a, TInEvent, TConnInfo, TPeerId>
DialingAttempt<'a, TInEvent, TConnInfo, TPeerId>
impl<'a, TInEvent>
DialingAttempt<'a, TInEvent>
{
/// Returns the ID of the current connection attempt.
pub fn id(&self) -> ConnectionId {
@ -622,7 +602,7 @@ impl<'a, TInEvent, TConnInfo, TPeerId>
}
/// Returns the (expected) peer ID of the dialing attempt.
pub fn peer_id(&self) -> &TPeerId {
pub fn peer_id(&self) -> &PeerId {
self.attempts.key()
}
@ -658,17 +638,17 @@ impl<'a, TInEvent, TConnInfo, TPeerId>
}
/// An iterator over the ongoing dialing attempts to a peer.
pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
/// The peer whose dialing attempts are being iterated.
peer_id: &'a TPeerId,
peer_id: &'a PeerId,
/// The underlying connection `Pool` of the `Network`.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
/// The state of all current dialing attempts known to the `Network`.
///
/// Ownership of the `OccupiedEntry` for `peer_id` containing all attempts must be
/// borrowed to each `DialingAttempt` in order for it to remove the entry if the
/// last dialing attempt is aborted.
dialing: &'a mut FnvHashMap<TPeerId, SmallVec<[DialingState; 10]>>,
dialing: &'a mut FnvHashMap<PeerId, SmallVec<[DialingState; 10]>>,
/// The current position of the iterator in `dialing[peer_id]`.
pos: usize,
/// The total number of elements in `dialing[peer_id]` to iterate over.
@ -678,23 +658,20 @@ pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THan
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
{
fn new(
peer_id: &'a TPeerId,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
dialing: &'a mut FnvHashMap<TPeerId, SmallVec<[DialingState; 10]>>,
peer_id: &'a PeerId,
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
dialing: &'a mut FnvHashMap<PeerId, SmallVec<[DialingState; 10]>>,
) -> Self {
let end = dialing.get(peer_id).map_or(0, |conns| conns.len());
Self { pos: 0, end, pool, dialing, peer_id }
}
/// Obtains the next dialing connection, if any.
pub fn next<'b>(&'b mut self) -> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>> {
pub fn next<'b>(&'b mut self) -> Option<DialingAttempt<'b, TInEvent>> {
if self.pos == self.end {
return None
}
@ -713,7 +690,7 @@ where
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(self)
-> Option<DialingAttempt<'b, TInEvent, TConnInfo, TPeerId>>
-> Option<DialingAttempt<'b, TInEvent>>
where 'a: 'b
{
if self.pos == self.end {

View File

@ -24,7 +24,6 @@ pub use crate::upgrade::Version;
use crate::{
ConnectedPoint,
ConnectionInfo,
Negotiated,
transport::{
Transport,
@ -44,7 +43,8 @@ use crate::{
UpgradeError,
OutboundUpgradeApply,
InboundUpgradeApply
}
},
PeerId
};
use futures::{prelude::*, ready};
use multiaddr::Multiaddr;
@ -73,7 +73,7 @@ use std::{
/// 2. Authentication must precede the negotiation of a multiplexer.
/// 3. Applying a multiplexer is the last step in the upgrade process.
/// 4. The [`Transport::Output`] conforms to the requirements of a [`Network`],
/// namely a tuple of a [`ConnectionInfo`] (from the authentication upgrade) and a
/// namely a tuple of a [`PeerId`] (from the authentication upgrade) and a
/// [`StreamMuxer`] (from the multiplexing upgrade).
///
/// [`Network`]: crate::Network
@ -96,25 +96,23 @@ where
/// Upgrades the transport to perform authentication of the remote.
///
/// The supplied upgrade receives the I/O resource `C` and must
/// produce a pair `(I, D)`, where `I` is a [`ConnectionInfo`] and
/// `D` is a new I/O resource. The upgrade must thus at a minimum
/// identify the remote, which typically involves the use of a
/// cryptographic authentication protocol in the context of establishing
/// a secure channel.
/// produce a pair `(PeerId, D)`, where `D` is a new I/O resource.
/// The upgrade must thus at a minimum identify the remote, which typically
/// involves the use of a cryptographic authentication protocol in the
/// context of establishing a secure channel.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> (I, D)`.
/// * Transport output: `C -> (I, D)`
pub fn authenticate<C, D, U, I, E>(self, upgrade: U) -> Authenticated<
/// * I/O upgrade: `C -> (PeerId, D)`.
/// * Transport output: `C -> (PeerId, D)`
pub fn authenticate<C, D, U, E>(self, upgrade: U) -> Authenticated<
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
> where
T: Transport<Output = C>,
I: ConnectionInfo,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = (I, D), Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = (I, D), Error = E> + Clone,
U: InboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
@ -161,23 +159,23 @@ where
///
/// Configured through [`Authenticated::multiplex`].
#[pin_project::pin_project]
pub struct Multiplex<C, U, I>
pub struct Multiplex<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
{
info: Option<I>,
peer_id: Option<PeerId>,
#[pin]
upgrade: EitherUpgrade<C, U>,
}
impl<C, U, I, M, E> Future for Multiplex<C, U, I>
impl<C, U, M, E> Future for Multiplex<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>
{
type Output = Result<(I, M), UpgradeError<E>>;
type Output = Result<(PeerId, M), UpgradeError<E>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
@ -185,7 +183,7 @@ where
Ok(m) => m,
Err(err) => return Poll::Ready(Err(err)),
};
let i = this.info.take().expect("Multiplex future polled after completion.");
let i = this.peer_id.take().expect("Multiplex future polled after completion.");
Poll::Ready(Ok((i, m)))
}
}
@ -208,13 +206,12 @@ where
/// ## Transitions
///
/// * I/O upgrade: `C -> D`.
/// * Transport output: `(I, C) -> (I, D)`.
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
/// * Transport output: `(PeerId, C) -> (PeerId, D)`.
pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
where
T: Transport<Output = (I, C)>,
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static,
@ -231,14 +228,13 @@ where
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex<C, M, U, I, E>(self, upgrade: U) -> Multiplexed<
AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
/// * Transport output: `(PeerId, C) -> (PeerId, M)`.
pub fn multiplex<C, M, U, E>(self, upgrade: U) -> Multiplexed<
AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
> where
T: Transport<Output = (I, C)>,
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
@ -246,13 +242,13 @@ where
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { info: Some(i), upgrade }
Multiplex { peer_id: Some(i), upgrade }
}))
}
/// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade.
///
/// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`]
/// The supplied function is applied to [`PeerId`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process.
@ -260,23 +256,22 @@ where
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F) -> Multiplexed<
AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
/// * Transport output: `(PeerId, C) -> (PeerId, M)`.
pub fn multiplex_ext<C, M, U, E, F>(self, up: F) -> Multiplexed<
AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
> where
T: Transport<Output = (I, C)>,
T: Transport<Output = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
Multiplex { peer_id: Some(peer_id), upgrade }
}))
}
}
@ -289,14 +284,13 @@ pub struct Multiplexed<T>(T);
impl<T> Multiplexed<T> {
/// Boxes the authenticated, multiplexed transport, including
/// the [`StreamMuxer`] and custom transport errors.
pub fn boxed<I, M>(self) -> super::Boxed<(I, StreamMuxerBox)>
pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
where
T: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
T: Transport<Output = (PeerId, M)> + Sized + Clone + Send + Sync + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Error: Send + Sync,
I: ConnectionInfo,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::OutboundSubstream: Send + 'static
@ -357,20 +351,20 @@ impl<T, U> Upgrade<T, U> {
}
}
impl<T, C, D, U, I, E> Transport for Upgrade<T, U>
impl<T, C, D, U, E> Transport for Upgrade<T, U>
where
T: Transport<Output = (I, C)>,
T: Transport<Output = (PeerId, C)>,
T::Error: 'static,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static
{
type Output = (I, D);
type Output = (PeerId, D);
type Error = TransportUpgradeError<T::Error, E>;
type Listener = ListenerStream<T::Listener, U>;
type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, I, C>;
type Dial = DialUpgradeFuture<T::Dial, U, I, C>;
type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
type Dial = DialUpgradeFuture<T::Dial, U, C>;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.inner.dial(addr.clone())
@ -427,23 +421,23 @@ where
}
/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
pub struct DialUpgradeFuture<F, U, I, C>
pub struct DialUpgradeFuture<F, U, C>
where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<I>, OutboundUpgradeApply<C, U>)>
upgrade: future::Either<Option<U>, (Option<PeerId>, OutboundUpgradeApply<C, U>)>
}
impl<F, U, I, C, D> Future for DialUpgradeFuture<F, U, I, C>
impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
where
F: TryFuture<Ok = (I, C)>,
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(I, D), TransportUpgradeError<F::Error, U::Error>>;
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// We use a `this` variable because the compiler can't mutably borrow multiple times
@ -473,7 +467,7 @@ where
}
}
impl<F, U, I, C> Unpin for DialUpgradeFuture<F, U, I, C>
impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
@ -486,14 +480,14 @@ pub struct ListenerStream<S, U> {
upgrade: U
}
impl<S, U, F, I, C, D, E> Stream for ListenerStream<S, U>
impl<S, U, F, C, D, E> Stream for ListenerStream<S, U>
where
S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
F: TryFuture<Ok = (I, C)>,
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
{
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, I, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
@ -520,23 +514,23 @@ impl<S, U> Unpin for ListenerStream<S, U> {
}
/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport.
pub struct ListenerUpgradeFuture<F, U, I, C>
pub struct ListenerUpgradeFuture<F, U, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
{
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<I>, InboundUpgradeApply<C, U>)>
upgrade: future::Either<Option<U>, (Option<PeerId>, InboundUpgradeApply<C, U>)>
}
impl<F, U, I, C, D> Future for ListenerUpgradeFuture<F, U, I, C>
impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
where
F: TryFuture<Ok = (I, C)>,
F: TryFuture<Ok = (PeerId, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
{
type Output = Result<(I, D), TransportUpgradeError<F::Error, U::Error>>;
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// We use a `this` variable because the compiler can't mutably borrow multiple times
@ -566,7 +560,7 @@ where
}
}
impl<F, U, I, C> Unpin for ListenerUpgradeFuture<F, U, I, C>
impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.1 [2020-10-28]
- Be lenient with duplicate `Close` frames received. Version

View File

@ -2,7 +2,7 @@
name = "libp2p-mplex"
edition = "2018"
description = "Mplex multiplexing protocol for libp2p"
version = "0.23.1"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "0.5"
futures = "0.3.1"
futures_codec = "0.4"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4"
nohash-hasher = "0.2"
parking_lot = "0.11"

View File

@ -1,3 +1,7 @@
# 0.27.0 [unreleased]
- Update dependencies.
# 0.26.0 [2020-10-16]
- Update `libp2p-core`.

View File

@ -2,7 +2,7 @@
name = "libp2p-yamux"
edition = "2018"
description = "Yamux multiplexing protocol for libp2p"
version = "0.26.0"
version = "0.27.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
parking_lot = "0.11"
thiserror = "1.0"
yamux = "0.8.0"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Bump `libp2p-core` dependency.

View File

@ -2,7 +2,7 @@
name = "libp2p-deflate"
edition = "2018"
description = "Deflate encryption protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
flate2 = "1.0"
[dev-dependencies]

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-floodsub"
edition = "2018"
description = "Floodsub protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"]
cuckoofilter = "0.5.0"
fnv = "1.0"
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
log = "0.4"
prost = "0.6.1"
rand = "0.7"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-gossipsub"
edition = "2018"
description = "Gossipsub protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Age Manning <Age@AgeManning.com>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -10,8 +10,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
bytes = "0.5.4"
byteorder = "1.3.2"
fnv = "1.0.6"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update `libp2p-swarm` and `libp2p-core`.

View File

@ -2,7 +2,7 @@
name = "libp2p-identify"
edition = "2018"
description = "Nodes identifcation protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
log = "0.4.1"
prost = "0.6.1"
smallvec = "1.0"

View File

@ -1,3 +1,7 @@
# 0.25.0 [unreleased]
- Update dependencies.
# 0.24.0 [2020-10-16]
- Update `libp2p-core` and `libp2p-swarm`.

View File

@ -2,7 +2,7 @@
name = "libp2p-kad"
edition = "2018"
description = "Kademlia protocol for libp2p"
version = "0.24.0"
version = "0.25.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -17,8 +17,8 @@ fnv = "1.0"
futures_codec = "0.4"
futures = "0.3.1"
log = "0.4"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
multihash = "0.11.0"
prost = "0.6.1"
rand = "0.7.2"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update `libp2p-swarm` and `libp2p-core`.

View File

@ -1,7 +1,7 @@
[package]
name = "libp2p-mdns"
edition = "2018"
version = "0.23.0"
version = "0.24.0"
description = "Implementation of the libp2p mDNS discovery method"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
@ -16,8 +16,8 @@ dns-parser = "0.8"
either = "1.5.3"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
log = "0.4"
net2 = "0.2"
rand = "0.7"

View File

@ -1,3 +1,7 @@
# 0.26.0 [unreleased]
- Update dependencies.
# 0.25.0 [2020-10-16]
- Update dependencies.

View File

@ -1,7 +1,7 @@
[package]
name = "libp2p-noise"
description = "Cryptographic handshake protocol using the noise framework."
version = "0.25.0"
version = "0.26.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -12,7 +12,7 @@ bytes = "0.5"
curve25519-dalek = "3.0.0"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4"
prost = "0.6.1"
rand = "0.7.2"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update `libp2p-swarm` and `libp2p-core`.

View File

@ -2,7 +2,7 @@
name = "libp2p-ping"
edition = "2018"
description = "Ping protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
log = "0.4.1"
rand = "0.7.2"
void = "1.0"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Improve error logging

View File

@ -2,7 +2,7 @@
name = "libp2p-plaintext"
edition = "2018"
description = "Plaintext encryption dummy protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "0.5"
futures = "0.3.1"
futures_codec = "0.4.0"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4.8"
prost = "0.6.1"
unsigned-varint = { version = "0.5.1", features = ["futures-codec"] }

View File

@ -1,3 +1,7 @@
# 0.5.0 [unreleased]
- Update dependencies.
# 0.4.0 [2020-10-16]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-request-response"
edition = "2018"
description = "Generic Request/Response Protocols"
version = "0.4.0"
version = "0.5.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"]
async-trait = "0.1"
bytes = "0.5.6"
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-swarm = { version = "0.23.0", path = "../../swarm" }
libp2p-core = { version = "0.24.0", path = "../../core" }
libp2p-swarm = { version = "0.24.0", path = "../../swarm" }
log = "0.4.11"
lru = "0.6"
minicbor = { version = "0.6", features = ["std", "derive"] }

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-secio"
edition = "2018"
description = "Secio encryption protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -19,7 +19,7 @@ ctr = "0.3"
futures = "0.3.1"
hmac = "0.9.0"
lazy_static = "1.2.0"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4.6"
prost = "0.6.1"
pin-project = "1.0.0"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Require a `Boxed` transport to be given to the `Swarm`

View File

@ -2,7 +2,7 @@
name = "libp2p-swarm"
edition = "2018"
description = "The libp2p swarm"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
either = "1.6.0"
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../core" }
libp2p-core = { version = "0.24.0", path = "../core" }
log = "0.4"
rand = "0.7"
smallvec = "1.0"

View File

@ -101,7 +101,6 @@ use libp2p_core::{
connection::{
ConnectionError,
ConnectionId,
ConnectionInfo,
ConnectionLimit,
ConnectedPoint,
EstablishedConnection,
@ -123,18 +122,17 @@ use libp2p_core::{
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, hash::Hash, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;
/// Contains the state of the network, plus the way it should behave.
pub type Swarm<TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
pub type Swarm<TBehaviour> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
TConnInfo,
>;
/// Substream for which a protocol has been chosen.
@ -255,18 +253,15 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
}
/// Contains the state of the network, plus the way it should behave.
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo = PeerId>
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
network: Network<
transport::Boxed<(TConnInfo, StreamMuxerBox)>,
transport::Boxed<(PeerId, StreamMuxerBox)>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
TConnInfo,
PeerId,
>,
/// Handles which nodes to connect to and how to handle the events sent back by the protocol
@ -292,11 +287,10 @@ where
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
type Target = TBehaviour;
@ -305,38 +299,35 @@ where
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
{
/// Builds a new `Swarm`.
pub fn new(
transport: transport::Boxed<(TConnInfo, StreamMuxerBox)>,
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId
) -> Self {
@ -428,7 +419,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Returns the connection info for an arbitrary connection with the peer, or `None`
/// if there is no connection to that peer.
// TODO: should take &self instead of &mut self, but the API in network requires &mut
pub fn connection_info(me: &mut Self, peer_id: &PeerId) -> Option<TConnInfo> {
pub fn connection_info(me: &mut Self, peer_id: &PeerId) -> Option<PeerId> {
if let Some(mut n) = me.network.peer(peer_id.clone()).into_connected() {
Some(n.some_connection().info().clone())
} else {
@ -529,14 +520,14 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
} else {
log::debug!("Connection {:?} closed (active close).", connected);
}
let info = connected.info;
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint);
this.behaviour.inject_connection_closed(&peer_id, &id, &endpoint);
if num_established == 0 {
this.behaviour.inject_disconnected(info.peer_id());
this.behaviour.inject_disconnected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id: info.peer_id().clone(),
peer_id,
endpoint,
cause: error,
num_established,
@ -774,13 +765,11 @@ enum PendingNotifyHandler {
///
/// Returns `None` if the connection is closing or the event has been
/// successfully sent, in either case the event is consumed.
fn notify_one<'a, TInEvent, TConnInfo>(
conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo>,
fn notify_one<'a, TInEvent>(
conn: &mut EstablishedConnection<'a, TInEvent>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<TInEvent>
where
TConnInfo: ConnectionInfo
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
@ -803,17 +792,15 @@ where
///
/// Returns `None` if either all connections are closing or the event
/// was successfully sent to a handler, in either case the event is consumed.
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
THandler: IntoConnectionHandler,
{
let mut pending = SmallVec::new();
let mut event = Some(event); // (1)
@ -852,18 +839,16 @@ where
/// Returns `None` if all connections are either closing or the event
/// was successfully sent to all handlers whose connections are not closing,
/// in either case the event is consumed.
fn notify_all<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
fn notify_all<'a, TTrans, TInEvent, TOutEvent, THandler>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TInEvent: Clone,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
THandler: IntoConnectionHandler,
{
if ids.len() == 1 {
if let Some(mut conn) = peer.connection(ids[0]) {
@ -893,14 +878,13 @@ where
None
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
type Item = TBehaviour::OutEvent;
@ -915,14 +899,13 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}
/// the stream of behaviour events never terminates, so we can implement fused for it
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
fn is_terminated(&self) -> bool {
false
@ -962,22 +945,21 @@ impl<'a> PollParameters for SwarmPollParameters<'a> {
/// A `SwarmBuilder` provides an API for configuring and constructing a `Swarm`,
/// including the underlying [`Network`].
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
pub struct SwarmBuilder<TBehaviour> {
local_peer_id: PeerId,
transport: transport::Boxed<(TConnInfo, StreamMuxerBox)>,
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
network_config: NetworkConfig,
}
impl<TBehaviour, TConnInfo> SwarmBuilder<TBehaviour, TConnInfo>
impl<TBehaviour> SwarmBuilder<TBehaviour>
where TBehaviour: NetworkBehaviour,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
/// Creates a new `SwarmBuilder` from the given transport, behaviour and
/// local peer ID. The `Swarm` with its underlying `Network` is obtained
/// via [`SwarmBuilder::build`].
pub fn new(
transport: transport::Boxed<(TConnInfo, StreamMuxerBox)>,
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId
) -> Self {
@ -1062,7 +1044,7 @@ where TBehaviour: NetworkBehaviour,
}
/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour, TConnInfo> {
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self.behaviour
.new_handler()
.inbound_protocol()

View File

@ -31,8 +31,6 @@ use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p_core::{
Multiaddr,
PeerId,
ConnectionInfo,
Connected,
connection::{
ConnectionHandler,
@ -65,18 +63,17 @@ where
}
}
impl<TIntoProtoHandler, TProtoHandler, TConnInfo> IntoConnectionHandler<TConnInfo>
impl<TIntoProtoHandler, TProtoHandler> IntoConnectionHandler
for NodeHandlerWrapperBuilder<TIntoProtoHandler>
where
TIntoProtoHandler: IntoProtocolsHandler<Handler = TProtoHandler>,
TProtoHandler: ProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
type Handler = NodeHandlerWrapper<TIntoProtoHandler::Handler>;
fn into_handler(self, connected: &Connected<TConnInfo>) -> Self::Handler {
fn into_handler(self, connected: &Connected) -> Self::Handler {
NodeHandlerWrapper {
handler: self.handler.into_handler(connected.peer_id(), &connected.endpoint),
handler: self.handler.into_handler(&connected.peer_id, &connected.endpoint),
negotiating_in: Default::default(),
negotiating_out: Default::default(),
queued_dial_upgrades: Vec::new(),

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Bump `libp2p-core` dependency.

View File

@ -2,7 +2,7 @@
name = "libp2p-dns"
edition = "2018"
description = "DNS transport implementation for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -10,6 +10,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update `libp2p-core`.

View File

@ -2,7 +2,7 @@
name = "libp2p-tcp"
edition = "2018"
description = "TCP/IP transport protocol for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -15,7 +15,7 @@ futures = "0.3.1"
futures-timer = "3.0"
if-addrs = "0.6.4"
ipnet = "2.0.0"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4.1"
socket2 = "0.3.12"
tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true }

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update `libp2p-core` dependency.

View File

@ -2,7 +2,7 @@
name = "libp2p-uds"
edition = "2018"
description = "Unix domain sockets transport for libp2p"
version = "0.23.0"
version = "0.24.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"]
[target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies]
async-std = { version = "1.6.2", optional = true }
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"
tokio = { version = "0.2", default-features = false, features = ["uds"], optional = true }

View File

@ -1,3 +1,7 @@
# 0.24.0 [unreleased]
- Update dependencies.
# 0.23.0 [2020-10-16]
- Update `libp2p-core` dependency.

View File

@ -1,6 +1,6 @@
[package]
name = "libp2p-wasm-ext"
version = "0.23.0"
version = "0.24.0"
authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"]
edition = "2018"
description = "Allows passing in an external transport in a WASM environment"
@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
js-sys = "0.3.19"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
parity-send-wrapper = "0.1.0"
wasm-bindgen = "0.2.42"
wasm-bindgen-futures = "0.4.4"

View File

@ -1,3 +1,7 @@
# 0.25.0 [unreleased]
- Update dependencies.
# 0.24.0 [2020-10-16]
- Update dependencies.

View File

@ -2,7 +2,7 @@
name = "libp2p-websocket"
edition = "2018"
description = "WebSocket transport for libp2p"
version = "0.24.0"
version = "0.25.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
async-tls = "0.10.0"
either = "1.5.3"
futures = "0.3.1"
libp2p-core = { version = "0.23.0", path = "../../core" }
libp2p-core = { version = "0.24.0", path = "../../core" }
log = "0.4.8"
quicksink = "0.1"
rustls = "0.18.0"