From 077b3301abdb9e4b87f3194510ee19a25ea8f937 Mon Sep 17 00:00:00 2001 From: vms Date: Mon, 6 Apr 2020 12:55:56 +0300 Subject: [PATCH] libp2p 0.17 --- CHANGELOG.md | 50 ++++ Cargo.toml | 46 ++-- core/Cargo.toml | 12 +- core/src/connection.rs | 5 +- core/src/connection/manager.rs | 36 +-- core/src/connection/manager/task.rs | 41 ++-- core/src/connection/pool.rs | 20 +- core/src/network.rs | 106 ++++----- core/src/network/event.rs | 2 + core/src/network/peer.rs | 80 ++++++- misc/core-derive/Cargo.toml | 4 +- misc/core-derive/src/lib.rs | 76 +++--- misc/multiaddr/Cargo.toml | 2 +- misc/multistream-select/Cargo.toml | 2 +- misc/peer-id-generator/Cargo.toml | 2 +- muxers/mplex/Cargo.toml | 6 +- muxers/mplex/src/lib.rs | 68 ++++-- muxers/yamux/Cargo.toml | 4 +- protocols/deflate/Cargo.toml | 6 +- protocols/floodsub/Cargo.toml | 6 +- protocols/floodsub/src/layer.rs | 20 +- protocols/gossipsub/Cargo.toml | 10 +- protocols/gossipsub/src/behaviour.rs | 10 +- protocols/gossipsub/src/behaviour/tests.rs | 6 +- protocols/identify/Cargo.toml | 12 +- protocols/identify/src/identify.rs | 43 ++-- protocols/kad/Cargo.toml | 10 +- protocols/kad/src/behaviour.rs | 34 ++- protocols/mdns/Cargo.toml | 6 +- protocols/mdns/src/behaviour.rs | 5 +- protocols/noise/Cargo.toml | 6 +- protocols/ping/Cargo.toml | 12 +- protocols/ping/src/lib.rs | 6 +- protocols/plaintext/Cargo.toml | 4 +- protocols/pnet/Cargo.toml | 2 +- protocols/secio/Cargo.toml | 8 +- swarm/Cargo.toml | 6 +- swarm/src/behaviour.rs | 65 +++++- swarm/src/lib.rs | 245 +++++++++++++------- swarm/src/protocols_handler.rs | 2 +- swarm/src/protocols_handler/node_handler.rs | 16 +- swarm/src/protocols_handler/one_shot.rs | 38 ++- swarm/src/toggle.rs | 20 +- transports/dns/Cargo.toml | 4 +- transports/tcp/Cargo.toml | 4 +- transports/uds/Cargo.toml | 4 +- transports/wasm-ext/Cargo.toml | 4 +- transports/websocket/Cargo.toml | 10 +- transports/websocket/src/framed.rs | 5 - 49 files changed, 733 insertions(+), 458 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c6595a4..05e66be6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,53 @@ +# Version ??? + + +# Version 0.17.0 (2020-04-02) + +- `libp2p-core`: Finished "identity hashing" for peer IDs migration. + [PR 1460](https://github.com/libp2p/rust-libp2p/pull/1460) +- `libp2p-core`: Remove `poll_broadcast`. + [PR 1527](https://github.com/libp2p/rust-libp2p/pull/1527) +- `libp2p-core`, `libp2p-swarm`: Report addresses of closed listeners. + [PR 1485](https://github.com/libp2p/rust-libp2p/pull/1485) +- `libp2p-core`: Support for multiple connections per peer and configurable connection limits. + See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440), + [PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and + [issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details. + +- `libp2p-swarm`: Pass the cause of closing a listener to `inject_listener_closed`. + [PR 1517](https://github.com/libp2p/rust-libp2p/pull/1517) +- `libp2p-swarm`: Support for multiple connections per peer and configurable connection limits. + See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440), + [PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and + [issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details. +- `libp2p-swarm`: The `SwarmEvent` now returns more events. + [PR 1515](https://github.com/libp2p/rust-libp2p/pull/1515) +- `libp2p-swarm`: New `protocols_handler::multi` module. + [PR 1497](https://github.com/libp2p/rust-libp2p/pull/1497) +- `libp2p-swarm`: Allow configuration of outbound substreams. + [PR 1521](https://github.com/libp2p/rust-libp2p/pull/1521) + +- `libp2p-kad`: Providers returned from a lookup are now deduplicated. + [PR 1528](https://github.com/libp2p/rust-libp2p/pull/1528) +- `libp2p-kad`: Allow customising the maximum packet size. + [PR 1502](https://github.com/libp2p/rust-libp2p/pull/1502) +- `libp2p-kad`: Allow customising the (libp2p) connection keep-alive timeout. + [PR 1477](https://github.com/libp2p/rust-libp2p/pull/1477) +- `libp2p-kad`: Avoid storing records that are expired upon receipt (optimisation). + [PR 1496](https://github.com/libp2p/rust-libp2p/pull/1496) +- `libp2p-kad`: Fixed potential panic on computing record expiry. + [PR 1492](https://github.com/libp2p/rust-libp2p/pull/1492) + +- `libp2p-mplex`: Guard against use of underlying `Sink` upon + error or connection close. + [PR 1529](https://github.com/libp2p/rust-libp2p/pull/1529) + +- `multistream-select`: Upgrade to stable futures. + [PR 1484](https://github.com/libp2p/rust-libp2p/pull/1484) + +- `multihash`: Removed the crate in favour of the upstream crate. + [PR 1472](https://github.com/libp2p/rust-libp2p/pull/1472) + # Version 0.16.2 (2020-02-28) - Fixed yamux connections not properly closing and being stuck in the `CLOSE_WAIT` state. diff --git a/Cargo.toml b/Cargo.toml index 4fed863c..8e04a662 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p" edition = "2018" description = "Peer-to-peer networking library" -version = "0.16.2" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -54,36 +54,36 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"] [dependencies] bytes = "0.5" futures = "0.3.1" -multiaddr = { package = "parity-multiaddr", version = "0.7.2", path = "misc/multiaddr" } +multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "misc/multiaddr" } multihash = "0.10" lazy_static = "1.2" -libp2p-mplex = { version = "0.16.0", path = "muxers/mplex", optional = true } -libp2p-identify = { version = "0.16.0", path = "protocols/identify", optional = true } -libp2p-kad = { version = "0.16.2", path = "protocols/kad", optional = true } -libp2p-floodsub = { version = "0.16.0", path = "protocols/floodsub", optional = true } -libp2p-gossipsub = { version = "0.16.0", path = "./protocols/gossipsub", optional = true } -libp2p-ping = { version = "0.16.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.16.0", path = "protocols/plaintext", optional = true } -libp2p-pnet = { version = "0.16.0", path = "protocols/pnet", optional = true } -libp2p-core = { version = "0.16.0", path = "core" } -libp2p-core-derive = { version = "0.16.0", path = "misc/core-derive" } -libp2p-secio = { version = "0.16.1", path = "protocols/secio", default-features = false, optional = true } -libp2p-swarm = { version = "0.16.1", path = "swarm" } -libp2p-uds = { version = "0.16.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.16.2", path = "transports/wasm-ext", optional = true } -libp2p-yamux = { version = "0.16.2", path = "muxers/yamux", optional = true } -libp2p-noise = { version = "0.16.2", path = "protocols/noise", optional = true } +libp2p-mplex = { version = "0.17.0", path = "muxers/mplex", optional = true } +libp2p-identify = { version = "0.17.0", path = "protocols/identify", optional = true } +libp2p-kad = { version = "0.17.0", path = "protocols/kad", optional = true } +libp2p-floodsub = { version = "0.17.0", path = "protocols/floodsub", optional = true } +libp2p-gossipsub = { version = "0.17.0", path = "./protocols/gossipsub", optional = true } +libp2p-ping = { version = "0.17.0", path = "protocols/ping", optional = true } +libp2p-plaintext = { version = "0.17.0", path = "protocols/plaintext", optional = true } +libp2p-pnet = { version = "0.17.0", path = "protocols/pnet", optional = true } +libp2p-core = { version = "0.17.1", path = "core" } +libp2p-core-derive = { version = "0.17.0", path = "misc/core-derive" } +libp2p-secio = { version = "0.17.0", path = "protocols/secio", default-features = false, optional = true } +libp2p-swarm = { version = "0.17.0", path = "swarm" } +libp2p-uds = { version = "0.17.0", path = "transports/uds", optional = true } +libp2p-wasm-ext = { version = "0.17.0", path = "transports/wasm-ext", optional = true } +libp2p-yamux = { version = "0.17.0", path = "muxers/yamux", optional = true } +libp2p-noise = { version = "0.17.0", path = "protocols/noise", optional = true } parking_lot = "0.10.0" pin-project = "0.4.6" smallvec = "1.0" wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.16.0", path = "protocols/deflate", optional = true } -libp2p-dns = { version = "0.16.0", path = "transports/dns", optional = true } -libp2p-mdns = { version = "0.16.0", path = "protocols/mdns", optional = true } -libp2p-tcp = { version = "0.16.0", path = "transports/tcp", optional = true } -libp2p-websocket = { version = "0.16.0", path = "transports/websocket", optional = true } +libp2p-deflate = { version = "0.17.0", path = "protocols/deflate", optional = true } +libp2p-dns = { version = "0.17.0", path = "transports/dns", optional = true } +libp2p-mdns = { version = "0.17.0", path = "protocols/mdns", optional = true } +libp2p-tcp = { version = "0.17.0", path = "transports/tcp", optional = true } +libp2p-websocket = { version = "0.17.0", path = "transports/websocket", optional = true } [dev-dependencies] async-std = "1.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 254828c9..47f60c04 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-core" edition = "2018" description = "Core traits and structs of libp2p" -version = "0.16.0" +version = "0.17.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -20,9 +20,9 @@ futures-timer = "3" lazy_static = "1.2" libsecp256k1 = { version = "0.3.1", optional = true } log = "0.4" -multiaddr = { package = "parity-multiaddr", version = "0.7.3", path = "../misc/multiaddr" } +multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "../misc/multiaddr" } multihash = "0.10" -multistream-select = { version = "0.7.0", path = "../misc/multistream-select" } +multistream-select = { version = "0.8.0", path = "../misc/multistream-select" } parking_lot = "0.10.0" pin-project = "0.4.6" prost = "0.6.1" @@ -40,9 +40,9 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal [dev-dependencies] async-std = "1.0" -libp2p-mplex = { version = "0.16.0", path = "../muxers/mplex" } -libp2p-secio = { version = "0.16.0", path = "../protocols/secio" } -libp2p-tcp = { version = "0.16.0", path = "../transports/tcp" } +libp2p-mplex = { version = "0.17.0", path = "../muxers/mplex" } +libp2p-secio = { version = "0.17.0", path = "../protocols/secio" } +libp2p-tcp = { version = "0.17.0", path = "../transports/tcp" } quickcheck = "0.9.0" wasm-timer = "0.2" diff --git a/core/src/connection.rs b/core/src/connection.rs index de6a03d0..82747f60 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -35,7 +35,7 @@ pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnecti use crate::muxing::StreamMuxer; use crate::{Multiaddr, PeerId}; -use std::{fmt, pin::Pin, task::Context, task::Poll}; +use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; use std::hash::Hash; use substream::{Muxing, SubstreamEvent}; @@ -334,3 +334,6 @@ impl fmt::Display for ConnectionLimit { write!(f, "{}/{}", self.current, self.limit) } } + +/// A `ConnectionLimit` can represent an error if it has been exceeded. +impl Error for ConnectionLimit {} diff --git a/core/src/connection/manager.rs b/core/src/connection/manager.rs index c0a4af44..c366a071 100644 --- a/core/src/connection/manager.rs +++ b/core/src/connection/manager.rs @@ -133,7 +133,7 @@ where /// the associated user data. #[derive(Debug)] struct TaskInfo { - /// channel endpoint to send messages to the task + /// Channel endpoint to send messages to the task. sender: mpsc::Sender>, /// The state of the task as seen by the `Manager`. state: TaskState, @@ -286,40 +286,6 @@ impl Manager { ConnectionId(task_id) } - /// Notifies the handlers of all managed connections of an event. - /// - /// This function is "atomic", in the sense that if `Poll::Pending` is - /// returned then no event has been sent. - #[must_use] - pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()> - where - I: Clone - { - for task in self.tasks.values_mut() { - if let Poll::Pending = task.sender.poll_ready(cx) { // (*) - return Poll::Pending; - } - } - - for (id, task) in self.tasks.iter_mut() { - let cmd = task::Command::NotifyHandler(event.clone()); - match task.sender.start_send(cmd) { - Ok(()) => {}, - Err(e) if e.is_full() => unreachable!("by (*)"), - Err(e) if e.is_disconnected() => { - // The background task ended. The manager will eventually be - // informed through an `Error` event from the task. - log::trace!("Connection dropped: {:?}", id); - }, - Err(e) => { - log::error!("Unexpected error: {:?}", e); - } - } - } - - Poll::Ready(()) - } - /// Gets an entry for a managed connection, if it exists. pub fn entry(&mut self, id: ConnectionId) -> Option> { if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) { diff --git a/core/src/connection/manager/task.rs b/core/src/connection/manager/task.rs index 9149e89e..4272722d 100644 --- a/core/src/connection/manager/task.rs +++ b/core/src/connection/manager/task.rs @@ -87,7 +87,7 @@ where commands: stream::Fuse>>, /// Inner state of this `Task`. - state: State, + state: State, } impl Task @@ -111,7 +111,6 @@ where state: State::Pending { future: Box::pin(future), handler, - events: Vec::new() }, } } @@ -133,7 +132,7 @@ where } /// The state associated with the `Task` of a connection. -enum State +enum State where M: StreamMuxer, H: IntoConnectionHandler, @@ -146,12 +145,6 @@ where future: Pin>, /// The intended handler for the established connection. handler: H, - /// While we are dialing the future, we need to buffer the events received via - /// `Command::NotifyHandler` so that they get delivered to the `handler` - /// once the connection is established. We can't leave these in `Task::receiver` - /// because we have to detect if the connection attempt has been aborted (by - /// dropping the corresponding `sender` owned by the manager). - events: Vec }, /// The connection is established and a new event is ready to be emitted. @@ -198,30 +191,29 @@ where 'poll: loop { match std::mem::replace(&mut this.state, State::Done) { - State::Pending { mut future, handler, mut events } => { - // Process commands from the manager. - loop { - match Stream::poll_next(Pin::new(&mut this.commands), cx) { - Poll::Pending => break, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(Command::NotifyHandler(event))) => - events.push(event), - } + State::Pending { mut future, handler } => { + // Check if the manager aborted this task by dropping the `commands` + // channel sender side. + match Stream::poll_next(Pin::new(&mut this.commands), cx) { + Poll::Pending => {}, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(Command::NotifyHandler(_))) => unreachable!( + "Manager does not allow sending commands to pending tasks.", + ) } // Check if the connection succeeded. match Future::poll(Pin::new(&mut future), cx) { Poll::Ready(Ok((info, muxer))) => { - let mut c = Connection::new(muxer, handler.into_handler(&info)); - for event in events { - c.inject_event(event) - } this.state = State::EstablishedReady { - connection: Some(c), + connection: Some(Connection::new( + muxer, + handler.into_handler(&info), + )), event: Event::Established { id, info } } } Poll::Pending => { - this.state = State::Pending { future, handler, events }; + this.state = State::Pending { future, handler }; return Poll::Pending } Poll::Ready(Err(error)) => { @@ -338,4 +330,3 @@ where } } } - diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index e6441beb..56e11236 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -225,7 +225,7 @@ where TPeerId: Clone + Send + 'static, { let endpoint = info.to_connected_point(); - if let Some(limit) = self.limits.max_pending_incoming { + if let Some(limit) = self.limits.max_incoming { let current = self.iter_pending_incoming().count(); if current >= limit { return Err(ConnectionLimit { limit, current }) @@ -330,18 +330,6 @@ where id } - /// Sends an event to all nodes. - /// - /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event - /// has been sent to any node yet. - #[must_use] - pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> - where - TInEvent: Clone - { - self.manager.poll_broadcast(event, cx) - } - /// Adds an existing established connection to the pool. /// /// Returns the assigned connection ID on success. An error is returned @@ -846,8 +834,8 @@ where /// The configurable limits of a connection [`Pool`]. #[derive(Debug, Clone, Default)] pub struct PoolLimits { - pub max_pending_outgoing: Option, - pub max_pending_incoming: Option, + pub max_outgoing: Option, + pub max_incoming: Option, pub max_established_per_peer: Option, } @@ -863,7 +851,7 @@ impl PoolLimits { where F: FnOnce() -> usize { - Self::check(current, self.max_pending_outgoing) + Self::check(current, self.max_outgoing) } fn check(current: F, limit: Option) -> Result<(), ConnectionLimit> diff --git a/core/src/network.rs b/core/src/network.rs index 52d0da80..10b6e063 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -220,7 +220,7 @@ where /// [`Connection`](crate::connection::Connection) upon success and the /// connection ID is returned. pub fn dial(&mut self, address: &Multiaddr, handler: THandler) - -> Result> + -> Result where TTrans: Transport, TTrans::Error: Send + 'static, @@ -232,10 +232,17 @@ where TConnInfo: Send + 'static, TPeerId: Send + 'static, { - let future = self.transport().clone().dial(address.clone())? - .map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); let info = OutgoingInfo { address, peer_id: None }; - self.pool.add_outgoing(future, handler, info).map_err(DialError::MaxPending) + match self.transport().clone().dial(address.clone()) { + Ok(f) => { + let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); + self.pool.add_outgoing(f, handler, info) + } + Err(err) => { + let f = future::err(PendingConnectionError::Transport(err)); + self.pool.add_outgoing(f, handler, info) + } + } } /// Returns information about the state of the `Network`. @@ -269,24 +276,28 @@ where }) } - /// Notifies the connection handler of _every_ connection of _every_ peer of an event. - /// - /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event - /// has been sent to any node yet. - #[must_use] - pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> - where - TInEvent: Clone - { - self.pool.poll_broadcast(event, cx) - } - /// Returns a list of all connected peers, i.e. peers to whom the `Network` /// has at least one established connection. pub fn connected_peers(&self) -> impl Iterator { self.pool.iter_connected() } + /// Checks whether the network has an established connection to a peer. + pub fn is_connected(&self, peer: &TPeerId) -> bool { + self.pool.is_connected(peer) + } + + /// Checks whether the network has an ongoing dialing attempt to a peer. + pub fn is_dialing(&self, peer: &TPeerId) -> bool { + self.dialing.contains_key(peer) + } + + /// Checks whether the network has neither an ongoing dialing attempt, + /// nor an established connection to a peer. + pub fn is_disconnected(&self, peer: &TPeerId) -> bool { + !self.is_connected(peer) && !self.is_dialing(peer) + } + /// Returns a list of all the peers to whom a new outgoing connection /// is currently being established. pub fn dialing_peers(&self) -> impl Iterator { @@ -296,7 +307,7 @@ where /// Gets the configured limit on pending incoming connections, /// i.e. concurrent incoming connection attempts. pub fn incoming_limit(&self) -> Option { - self.pool.limits().max_pending_incoming + self.pool.limits().max_incoming } /// The total number of established connections in the `Network`. @@ -392,8 +403,9 @@ where } event } - Poll::Ready(PoolEvent::ConnectionError { connected, error, num_established, .. }) => { + Poll::Ready(PoolEvent::ConnectionError { id, connected, error, num_established, .. }) => { NetworkEvent::ConnectionError { + id, connected, error, num_established, @@ -569,43 +581,6 @@ pub struct NetworkInfo { pub num_connections_established: usize, } -/// The possible errors of [`Network::dial`]. -#[derive(Debug)] -pub enum DialError { - /// The configured limit of pending outgoing connections has been reached. - MaxPending(ConnectionLimit), - /// A transport error occurred when creating the connection. - Transport(TransportError), -} - -impl fmt::Display for DialError -where T: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - DialError::MaxPending(limit) => write!(f, "Dial error (pending limit): {}", limit.current), - DialError::Transport(err) => write!(f, "Dial error (transport): {}", err), - } - } -} - -impl std::error::Error for DialError -where T: std::error::Error + 'static, -{ - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - DialError::MaxPending(_) => None, - DialError::Transport(e) => Some(e), - } - } -} - -impl From> for DialError { - fn from(e: TransportError) -> DialError { - DialError::Transport(e) - } -} - /// The (optional) configuration for a [`Network`]. /// /// The default configuration specifies no dedicated task executor @@ -622,17 +597,29 @@ impl NetworkConfig { self } + /// Shortcut for calling `executor` with an object that calls the given closure. + pub fn set_executor_fn(mut self, f: impl Fn(Pin + Send>>) + Send + 'static) -> Self { + struct SpawnImpl(F); + impl + Send>>)> Executor for SpawnImpl { + fn exec(&self, f: Pin + Send>>) { + (self.0)(f) + } + } + self.set_executor(Box::new(SpawnImpl(f))); + self + } + pub fn executor(&self) -> Option<&Box> { self.executor.as_ref() } - pub fn set_pending_incoming_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_pending_incoming = Some(n); + pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self { + self.pool_limits.max_incoming = Some(n); self } - pub fn set_pending_outgoing_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_pending_outgoing = Some(n); + pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self { + self.pool_limits.max_outgoing = Some(n); self } @@ -641,4 +628,3 @@ impl NetworkConfig { self } } - diff --git a/core/src/network/event.rs b/core/src/network/event.rs index 233b35fd..afbedcf7 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -114,6 +114,8 @@ where /// /// The connection is closed as a result of the error. ConnectionError { + /// The ID of the connection that encountered an error. + id: ConnectionId, /// Information about the connection that encountered the error. connected: Connected, /// The error that occurred. diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index d9f1bda4..b06be772 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -174,28 +174,66 @@ where TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, TPeerId: Eq + Hash + Clone + Send + 'static, { + /// Checks whether the peer is currently connected. + /// + /// Returns `true` iff [`Peer::into_connected`] returns `Some`. + pub fn is_connected(&self) -> bool { + match self { + Peer::Connected(..) => true, + Peer::Dialing(peer) => peer.is_connected(), + Peer::Disconnected(..) => false, + Peer::Local => false + } + } - /// If we are connected, returns the `ConnectedPeer`. + /// Checks whether the peer is currently being dialed. + /// + /// Returns `true` iff [`Peer::into_dialing`] returns `Some`. + pub fn is_dialing(&self) -> bool { + match self { + Peer::Dialing(_) => true, + Peer::Connected(peer) => peer.is_dialing(), + Peer::Disconnected(..) => false, + Peer::Local => false + } + } + + /// Checks whether the peer is currently disconnected. + /// + /// Returns `true` iff [`Peer::into_disconnected`] returns `Some`. + pub fn is_disconnected(&self) -> bool { + match self { + Peer::Disconnected(..) => true, + _ => false + } + } + + /// Converts the peer into a `ConnectedPeer`, if there an established connection exists. pub fn into_connected(self) -> Option< ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { match self { Peer::Connected(peer) => Some(peer), - _ => None, + Peer::Dialing(peer) => peer.into_connected(), + Peer::Disconnected(..) => None, + Peer::Local => None, } } - /// If a connection is pending, returns the `DialingPeer`. + /// Converts the peer into a `DialingPeer`, if a dialing attempt exists. pub fn into_dialing(self) -> Option< DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { match self { Peer::Dialing(peer) => Some(peer), - _ => None, + Peer::Connected(peer) => peer.into_dialing(), + Peer::Disconnected(..) => None, + Peer::Local => None } } - /// If we are not connected, returns the `DisconnectedPeer`. + /// Converts the peer into a `DisconnectedPeer`, if neither an established connection + /// nor a dialing attempt exists. pub fn into_disconnected(self) -> Option< DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { @@ -225,6 +263,10 @@ where TConnInfo: ConnectionInfo, TPeerId: Eq + Hash + Clone, { + pub fn id(&self) -> &TPeerId { + &self.peer_id + } + /// Attempts to establish a new connection to this peer using the given addresses, /// if there is currently no ongoing dialing attempt. /// @@ -294,7 +336,7 @@ where self.network.dialing.contains_key(&self.peer_id) } - /// Turns this peer into a [`DialingPeer`], if there is an ongoing + /// Converts this peer into a [`DialingPeer`], if there is an ongoing /// dialing attempt, `None` otherwise. pub fn into_dialing(self) -> Option< DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> @@ -373,12 +415,34 @@ where TConnInfo: ConnectionInfo, TPeerId: Eq + Hash + Clone, { + pub fn id(&self) -> &TPeerId { + &self.peer_id + } + /// Disconnects from this peer, closing all pending connections. pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> { self.network.disconnect(&self.peer_id); DisconnectedPeer { network: self.network, peer_id: self.peer_id } } + /// Checks whether there is an established connection to the peer. + /// + /// Returns `true` iff [`DialingPeer::into_connected`] returns `Some`. + pub fn is_connected(&self) -> bool { + self.network.pool.is_connected(&self.peer_id) + } + + /// Converts the peer into a `ConnectedPeer`, if an established connection exists. + pub fn into_connected(self) + -> Option> + { + if self.is_connected() { + Some(ConnectedPeer { peer_id: self.peer_id, network: self.network }) + } else { + None + } + } + /// Obtains the connection that is currently being established. pub fn connection<'b>(&'b mut self) -> DialingConnection<'b, TInEvent, TConnInfo, TPeerId> { let attempt = match self.network.dialing.entry(self.peer_id.clone()) { @@ -452,6 +516,10 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { + pub fn id(&self) -> &TPeerId { + &self.peer_id + } + /// Attempts to connect to this peer using the given addresses. pub fn connect(self, first: Multiaddr, rest: TIter, handler: THandler) -> Result, diff --git a/misc/core-derive/Cargo.toml b/misc/core-derive/Cargo.toml index 85fa2318..b181d745 100644 --- a/misc/core-derive/Cargo.toml +++ b/misc/core-derive/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-core-derive" edition = "2018" description = "Procedural macros of libp2p-core" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,4 +17,4 @@ syn = { version = "1.0.8", default-features = false, features = ["clone-impls", quote = "1.0" [dev-dependencies] -libp2p = { version = "0.16.0", path = "../.." } +libp2p = { version = "0.17.0", path = "../.." } diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 46a21fd3..1153276f 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -131,44 +131,52 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { // Build the list of statements to put in the body of `inject_connected()`. let inject_connected_stmts = { - let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count(); data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { if is_ignored(&field) { return None; } - - Some(if field_n == num_fields - 1 { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_connected(peer_id, endpoint); }, - None => quote!{ self.#field_n.inject_connected(peer_id, endpoint); }, - } - } else { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_connected(peer_id.clone(), endpoint.clone()); }, - None => quote!{ self.#field_n.inject_connected(peer_id.clone(), endpoint.clone()); }, - } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_connected(peer_id); }, + None => quote!{ self.#field_n.inject_connected(peer_id); }, }) }) }; // Build the list of statements to put in the body of `inject_disconnected()`. let inject_disconnected_stmts = { - let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count(); data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { if is_ignored(&field) { return None; } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id); }, + None => quote!{ self.#field_n.inject_disconnected(peer_id); }, + }) + }) + }; - Some(if field_n == num_fields - 1 { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id, endpoint); }, - None => quote!{ self.#field_n.inject_disconnected(peer_id, endpoint); }, - } - } else { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id, endpoint.clone()); }, - None => quote!{ self.#field_n.inject_disconnected(peer_id, endpoint.clone()); }, - } + // Build the list of statements to put in the body of `inject_connection_established()`. + let inject_connection_established_stmts = { + data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { + if is_ignored(&field) { + return None; + } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint); }, + None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint); }, + }) + }) + }; + + // Build the list of statements to put in the body of `inject_connection_closed()`. + let inject_connection_closed_stmts = { + data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { + if is_ignored(&field) { + return None; + } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint); }, + None => quote!{ self.#field_n.inject_connection_closed(peer_id, connection_id, endpoint); }, }) }) }; @@ -263,8 +271,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { return None } Some(match field.ident { - Some(ref i) => quote!(self.#i.inject_listener_closed(id);), - None => quote!(self.#field_n.inject_listener_closed(id);) + Some(ref i) => quote!(self.#i.inject_listener_closed(id, reason);), + None => quote!(self.#field_n.inject_listener_closed(id, reason);) }) }) }; @@ -383,8 +391,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => { return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }); } - std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => { - return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }); + std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }) => { + return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }); } std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => { return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { @@ -421,14 +429,22 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } - fn inject_connected(&mut self, peer_id: #peer_id, endpoint: #connected_point) { + fn inject_connected(&mut self, peer_id: &#peer_id) { #(#inject_connected_stmts);* } - fn inject_disconnected(&mut self, peer_id: &#peer_id, endpoint: #connected_point) { + fn inject_disconnected(&mut self, peer_id: &#peer_id) { #(#inject_disconnected_stmts);* } + fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) { + #(#inject_connection_established_stmts);* + } + + fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) { + #(#inject_connection_closed_stmts);* + } + fn inject_addr_reach_failure(&mut self, peer_id: Option<&#peer_id>, addr: &#multiaddr, error: &dyn std::error::Error) { #(#inject_addr_reach_failure_stmts);* } @@ -453,7 +469,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #(#inject_listener_error_stmts);* } - fn inject_listener_closed(&mut self, id: #listener_id) { + fn inject_listener_closed(&mut self, id: #listener_id, reason: Result<(), &std::io::Error>) { #(#inject_listener_closed_stmts);* } diff --git a/misc/multiaddr/Cargo.toml b/misc/multiaddr/Cargo.toml index c30b4504..239407ae 100644 --- a/misc/multiaddr/Cargo.toml +++ b/misc/multiaddr/Cargo.toml @@ -6,7 +6,7 @@ description = "Implementation of the multiaddr format" homepage = "https://github.com/libp2p/rust-libp2p" keywords = ["multiaddr", "ipfs"] license = "MIT" -version = "0.7.3" +version = "0.8.0" [dependencies] arrayref = "0.3" diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 33857b11..fa4f03e9 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "multistream-select" description = "Multistream-select negotiation protocol for libp2p" -version = "0.7.0" +version = "0.8.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/misc/peer-id-generator/Cargo.toml b/misc/peer-id-generator/Cargo.toml index f5159d49..aa6d19b6 100644 --- a/misc/peer-id-generator/Cargo.toml +++ b/misc/peer-id-generator/Cargo.toml @@ -11,5 +11,5 @@ categories = ["network-programming", "asynchronous"] publish = false [dependencies] -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } num_cpus = "1.8" diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index f789b9fe..b661e38f 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mplex" edition = "2018" description = "Mplex multiplexing protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,11 +14,11 @@ bytes = "0.5" fnv = "1.0" futures = "0.3.1" futures_codec = "0.3.4" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4" parking_lot = "0.10" unsigned-varint = { version = "0.3", features = ["futures-codec"] } [dev-dependencies] async-std = "1.0" -libp2p-tcp = { version = "0.16.0", path = "../../transports/tcp" } +libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 64c532f8..d8350606 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -53,7 +53,6 @@ pub struct MplexConfig { impl MplexConfig { /// Builds the default configuration. - #[inline] pub fn new() -> MplexConfig { Default::default() } @@ -62,7 +61,6 @@ impl MplexConfig { /// generated and the connection closes. /// /// A limit is necessary in order to avoid DoS attacks. - #[inline] pub fn max_substreams(&mut self, max: usize) -> &mut Self { self.max_substreams = max; self @@ -71,7 +69,6 @@ impl MplexConfig { /// Sets the maximum number of pending incoming messages. /// /// A limit is necessary in order to avoid DoS attacks. - #[inline] pub fn max_buffer_len(&mut self, max: usize) -> &mut Self { self.max_buffer_len = max; self @@ -80,7 +77,6 @@ impl MplexConfig { /// Sets the behaviour when the maximum buffer length has been reached. /// /// See the documentation of `MaxBufferBehaviour`. - #[inline] pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self { self.max_buffer_behaviour = behaviour; self @@ -94,7 +90,6 @@ impl MplexConfig { self } - #[inline] fn upgrade(self, i: C) -> Multiplex where C: AsyncRead + AsyncWrite + Unpin @@ -122,7 +117,6 @@ impl MplexConfig { } impl Default for MplexConfig { - #[inline] fn default() -> MplexConfig { MplexConfig { max_substreams: 128, @@ -149,7 +143,6 @@ impl UpgradeInfo for MplexConfig { type Info = &'static [u8]; type InfoIter = iter::Once; - #[inline] fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/mplex/6.7.0") } @@ -334,9 +327,7 @@ where C: AsyncRead + AsyncWrite + Unpin, fn poll_send(inner: &mut MultiplexInner, cx: &mut Context, elem: codec::Elem) -> Poll> where C: AsyncRead + AsyncWrite + Unpin { - if inner.is_shutdown { - return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down"))) - } + ensure_no_error_no_close(inner)?; inner.notifier_write.insert(cx.waker()); @@ -348,10 +339,26 @@ where C: AsyncRead + AsyncWrite + Unpin } }, Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => Poll::Ready(Err(err)) + Poll::Ready(Err(err)) => { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + Poll::Ready(Err(err)) + } } } +fn ensure_no_error_no_close(inner: &mut MultiplexInner) -> Result<(), IoError> +where + C: AsyncRead + AsyncWrite + Unpin +{ + if inner.is_shutdown { + return Err(IoError::new(IoErrorKind::Other, "connection is shut down")) + } + if let Err(ref e) = inner.error { + return Err(IoError::new(e.kind(), e.to_string())) + } + Ok(()) +} + impl StreamMuxer for Multiplex where C: AsyncRead + AsyncWrite + Unpin { @@ -418,9 +425,7 @@ where C: AsyncRead + AsyncWrite + Unpin poll_send(&mut inner, cx, elem.clone()) }, OutboundSubstreamState::Flush => { - if inner.is_shutdown { - return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down"))) - } + ensure_no_error_no_close(&mut inner)?; let inner = &mut *inner; // Avoids borrow errors inner.notifier_write.insert(cx.waker()); Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) @@ -438,6 +443,7 @@ where C: AsyncRead + AsyncWrite + Unpin inner.buffer.retain(|elem| { elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer) }); + inner.error = Err(IoError::new(err.kind(), err.to_string())); return Poll::Ready(Err(err)); }, }; @@ -465,7 +471,6 @@ where C: AsyncRead + AsyncWrite + Unpin } } - #[inline] fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { // Nothing to do. } @@ -548,13 +553,14 @@ where C: AsyncRead + AsyncWrite + Unpin fn flush_substream(&self, cx: &mut Context, _substream: &mut Self::Substream) -> Poll> { let mut inner = self.inner.lock(); - if inner.is_shutdown { - return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down"))) - } - + ensure_no_error_no_close(&mut inner)?; let inner = &mut *inner; // Avoids borrow errors inner.notifier_write.insert(cx.waker()); - Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) + let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))); + if let Poll::Ready(Err(err)) = &result { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + } + result } fn shutdown_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll> { @@ -585,28 +591,42 @@ where C: AsyncRead + AsyncWrite + Unpin self.inner.lock().is_acknowledged } - #[inline] fn close(&self, cx: &mut Context) -> Poll> { let inner = &mut *self.inner.lock(); + if inner.is_shutdown { + return Poll::Ready(Ok(())) + } + if let Err(ref e) = inner.error { + return Poll::Ready(Err(IoError::new(e.kind(), e.to_string()))) + } inner.notifier_write.insert(cx.waker()); match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) { Poll::Ready(Ok(())) => { inner.is_shutdown = true; Poll::Ready(Ok(())) } - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Err(err)) => { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + Poll::Ready(Err(err)) + } Poll::Pending => Poll::Pending, } } - #[inline] fn flush_all(&self, cx: &mut Context) -> Poll> { let inner = &mut *self.inner.lock(); if inner.is_shutdown { return Poll::Ready(Ok(())) } + if let Err(ref e) = inner.error { + return Poll::Ready(Err(IoError::new(e.kind(), e.to_string()))) + } inner.notifier_write.insert(cx.waker()); - Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) + let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))); + if let Poll::Ready(Err(err)) = &result { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + } + result } } diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 6c59f932..a79d49f7 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-yamux" edition = "2018" description = "Yamux multiplexing protocol for libp2p" -version = "0.16.2" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } parking_lot = "0.10" thiserror = "1.0" yamux = "0.4.5" diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml index bdf502b6..c4aa2262 100644 --- a/protocols/deflate/Cargo.toml +++ b/protocols/deflate/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-deflate" edition = "2018" description = "Deflate encryption protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,11 +11,11 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } flate2 = "1.0" [dev-dependencies] async-std = "1.0" -libp2p-tcp = { version = "0.16.0", path = "../../transports/tcp" } +libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } rand = "0.7" quickcheck = "0.9" diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 8b58fa2c..f1ea3a0b 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-floodsub" edition = "2018" description = "Floodsub protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.3.2" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../../core" } -libp2p-swarm = { version = "0.16.0", path = "../../swarm" } +libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-swarm = { version = "0.17.0", path = "../../swarm" } prost = "0.6.1" rand = "0.7" smallvec = "1.0" diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 18e87c54..1c837b2d 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -22,14 +22,15 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubs use crate::topic::Topic; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, OneShotHandler, - NotifyHandler + NotifyHandler, + DialPeerCondition, }; use rand; use smallvec::SmallVec; @@ -96,7 +97,9 @@ impl Floodsub { } if self.target_peers.insert(peer_id.clone()) { - self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id }); + self.events.push_back(NetworkBehaviourAction::DialPeer { + peer_id, condition: DialPeerCondition::Disconnected + }); } } @@ -236,9 +239,9 @@ impl NetworkBehaviour for Floodsub { Vec::new() } - fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + fn inject_connected(&mut self, id: &PeerId) { // We need to send our subscriptions to the newly-connected node. - if self.target_peers.contains(&id) { + if self.target_peers.contains(id) { for topic in self.subscribed_topics.iter().cloned() { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: id.clone(), @@ -257,14 +260,17 @@ impl NetworkBehaviour for Floodsub { self.connected_peers.insert(id.clone(), SmallVec::new()); } - fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId) { let was_in = self.connected_peers.remove(id); debug_assert!(was_in.is_some()); // We can be disconnected by the remote in case of inactivity for example, so we always // try to reconnect. if self.target_peers.contains(id) { - self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: id.clone() }); + self.events.push_back(NetworkBehaviourAction::DialPeer { + peer_id: id.clone(), + condition: DialPeerCondition::Disconnected + }); } } diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 5ff571fc..8826b9b1 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-gossipsub" edition = "2018" description = "Gossipsub protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,8 +10,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.16.0", path = "../../swarm" } -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-swarm = { version = "0.17.0", path = "../../swarm" } +libp2p-core = { version = "0.17.0", path = "../../core" } bytes = "0.5.4" byteorder = "1.3.2" fnv = "1.0.6" @@ -30,8 +30,8 @@ prost = "0.6.1" [dev-dependencies] async-std = "1.4.0" env_logger = "0.7.1" -libp2p-plaintext = { version = "0.16.0", path = "../plaintext" } -libp2p-yamux = { version = "0.16.0", path = "../../muxers/yamux" } +libp2p-plaintext = { version = "0.17.0", path = "../plaintext" } +libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" } quickcheck = "0.9.2" [build-dependencies] diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2e55571a..63611df9 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -27,7 +27,7 @@ use crate::protocol::{ }; use crate::topic::{Topic, TopicHash}; use futures::prelude::*; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, @@ -1012,7 +1012,7 @@ impl NetworkBehaviour for Gossipsub { Vec::new() } - fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + fn inject_connected(&mut self, id: &PeerId) { info!("New peer connected: {:?}", id); // We need to send our subscriptions to the newly-connected node. let mut subscriptions = vec![]; @@ -1040,7 +1040,7 @@ impl NetworkBehaviour for Gossipsub { self.peer_topics.insert(id.clone(), Vec::new()); } - fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId) { // remove from mesh, topic_peers, peer_topic and fanout debug!("Peer disconnected: {:?}", id); { @@ -1164,8 +1164,8 @@ impl NetworkBehaviour for Gossipsub { NetworkBehaviourAction::DialAddress { address } => { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }); } - NetworkBehaviourAction::DialPeer { peer_id } => { - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }); + NetworkBehaviourAction::DialPeer { peer_id, condition } => { + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }); } NetworkBehaviourAction::ReportObservedAddr { address } => { return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index a3ca6ea5..e2073151 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -49,17 +49,13 @@ mod tests { // build and connect peer_no random peers let mut peers = vec![]; - let dummy_connected_point = ConnectedPoint::Dialer { - address: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), - }; for _ in 0..peer_no { let peer = PeerId::random(); peers.push(peer.clone()); ::inject_connected( &mut gs, - peer.clone(), - dummy_connected_point.clone(), + &peer, ); if to_subscribe { gs.handle_received_subscriptions( diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index f60a2610..07aa7500 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-identify" edition = "2018" description = "Nodes identifcation protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../../core" } -libp2p-swarm = { version = "0.16.0", path = "../../swarm" } +libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-swarm = { version = "0.17.0", path = "../../swarm" } log = "0.4.1" prost = "0.6.1" smallvec = "1.0" @@ -20,9 +20,9 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.0" -libp2p-mplex = { version = "0.16.0", path = "../../muxers/mplex" } -libp2p-secio = { version = "0.16.0", path = "../../protocols/secio" } -libp2p-tcp = { version = "0.16.0", path = "../../transports/tcp" } +libp2p-mplex = { version = "0.17.0", path = "../../muxers/mplex" } +libp2p-secio = { version = "0.17.0", path = "../../protocols/secio" } +libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } [build-dependencies] prost-build = "0.6" diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index fe78bc1b..312e273d 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -37,7 +37,13 @@ use libp2p_swarm::{ ProtocolsHandler, ProtocolsHandlerUpgrErr }; -use std::{collections::HashMap, collections::VecDeque, io, pin::Pin, task::Context, task::Poll}; +use std::{ + collections::{HashMap, VecDeque}, + io, + pin::Pin, + task::Context, + task::Poll +}; /// Network behaviour that automatically identifies nodes periodically, returns information /// about them, and answers identify queries from other nodes. @@ -49,7 +55,7 @@ pub struct Identify { /// The public key of the local node. To report on the wire. local_public_key: PublicKey, /// For each peer we're connected to, the observed address to send back to it. - observed_addresses: HashMap, + observed_addresses: HashMap>, /// Pending replies to send. pending_replies: VecDeque, /// Pending events to be emitted when polled. @@ -97,23 +103,32 @@ impl NetworkBehaviour for Identify { Vec::new() } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - let observed = match endpoint { - ConnectedPoint::Dialer { address } => address, - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, - }; - - self.observed_addresses.insert(peer_id, observed); + fn inject_connected(&mut self, _: &PeerId) { } - fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + let addr = match endpoint { + ConnectedPoint::Dialer { address } => address.clone(), + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), + }; + + self.observed_addresses.entry(peer_id.clone()).or_default().insert(*conn, addr); + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { + if let Some(addrs) = self.observed_addresses.get_mut(peer_id) { + addrs.remove(conn); + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { self.observed_addresses.remove(peer_id); } fn inject_event( &mut self, peer_id: PeerId, - _connection: ConnectionId, + connection: ConnectionId, event: ::OutEvent, ) { match event { @@ -132,9 +147,9 @@ impl NetworkBehaviour for Identify { } IdentifyHandlerEvent::Identify(sender) => { let observed = self.observed_addresses.get(&peer_id) - .expect("We only receive events from nodes we're connected to. We insert \ - into the hashmap when we connect to a node and remove only when we \ - disconnect; QED"); + .and_then(|addrs| addrs.get(&connection)) + .expect("`inject_event` is only called with an established connection \ + and `inject_connection_established` ensures there is an entry; qed"); self.pending_replies.push_back( Reply::Queued { peer: peer_id, diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 3264d224..6e914a90 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-kad" edition = "2018" description = "Kademlia protocol for libp2p" -version = "0.16.2" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,8 +17,8 @@ fnv = "1.0" futures_codec = "0.3.4" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.16.0", path = "../../core" } -libp2p-swarm = { version = "0.16.0", path = "../../swarm" } +libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-swarm = { version = "0.17.0", path = "../../swarm" } multihash = "0.10" prost = "0.6.1" rand = "0.7.2" @@ -34,8 +34,8 @@ derivative = "2.0.2" trust-graph = { git = "ssh://git@github.com/fluencelabs/arqada.git", branch = "master" } [dev-dependencies] -libp2p-secio = { version = "0.16.0", path = "../secio" } -libp2p-yamux = { version = "0.16.0", path = "../../muxers/yamux" } +libp2p-secio = { version = "0.17.0", path = "../secio" } +libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" } quickcheck = "0.9.0" [build-dependencies] diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 3bd76ef7..26c021b2 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -34,6 +34,7 @@ use crate::contact::Contact; use fnv::{FnvHashMap, FnvHashSet}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId, multiaddr}; use libp2p_swarm::{ + DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, @@ -43,7 +44,7 @@ use libp2p_swarm::{ use log::{info, debug, warn}; use smallvec::SmallVec; use std::{borrow::{Borrow, Cow}, error, iter, time::Duration}; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::num::NonZeroUsize; use std::task::{Context, Poll}; use wasm_timer::Instant; @@ -546,7 +547,7 @@ where self.print_bucket_table(); let info = QueryInfo::GetProviders { key: key.clone(), - providers: Vec::new(), + providers: HashSet::new(), }; let target = kbucket::Key::new(key); debug!( @@ -1228,12 +1229,25 @@ where peer_addrs } - fn inject_connected(&mut self, peer: PeerId, endpoint: ConnectedPoint) { + fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, endpoint: &ConnectedPoint) { + // The remote's address can only be put into the routing table, + // and thus shared with other nodes, if the local node is the dialer, + // since the remote address on an inbound connection is specific to + // that connection (e.g. typically the TCP port numbers). + let address = match endpoint { + ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Listener { .. } => None, + }; + + self.connection_updated(peer.clone(), address, NodeStatus::Connected); + } + + fn inject_connected(&mut self, peer: &PeerId) { // Queue events for sending pending RPCs to the connected peer. // There can be only one pending RPC for a particular peer and query per definition. for (peer_id, event) in self.queries.iter_mut().filter_map(|q| q.inner.pending_rpcs.iter() - .position(|(p, _)| p == &peer) + .position(|(p, _)| p == peer) .map(|p| q.inner.pending_rpcs.remove(p))) { self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { @@ -1312,7 +1326,7 @@ where } } - fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId) { for query in self.queries.iter_mut() { query.on_failure(id); } @@ -1378,7 +1392,7 @@ where providers, .. } = &mut query.inner.info { for peer in provider_peers { - providers.push(peer.node_id); + providers.insert(peer.node_id); } } } @@ -1587,7 +1601,7 @@ where } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id.clone(), event)); self.queued_events.push_back(NetworkBehaviourAction::DialPeer { - peer_id + peer_id, condition: DialPeerCondition::Disconnected }); } } @@ -1851,7 +1865,7 @@ pub type GetProvidersResult = Result; #[derive(Debug, Clone)] pub struct GetProvidersOk { pub key: record::Key, - pub providers: Vec, + pub providers: HashSet, pub closest_peers: Vec } @@ -1860,7 +1874,7 @@ pub struct GetProvidersOk { pub enum GetProvidersError { Timeout { key: record::Key, - providers: Vec, + providers: HashSet, closest_peers: Vec } } @@ -2010,7 +2024,7 @@ enum QueryInfo { /// The key for which to search for providers. key: record::Key, /// The found providers. - providers: Vec, + providers: HashSet, }, /// A query that searches for the closest closest nodes to a key to be diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 4bbc3660..5d21d3fa 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-mdns" edition = "2018" -version = "0.16.0" +version = "0.17.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" @@ -16,8 +16,8 @@ dns-parser = "0.8" either = "1.5.3" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.16.0", path = "../../core" } -libp2p-swarm = { version = "0.16.0", path = "../../swarm" } +libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-swarm = { version = "0.17.0", path = "../../swarm" } log = "0.4" net2 = "0.2" rand = "0.7" diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 85685f72..abc580ff 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -21,7 +21,6 @@ use crate::service::{MdnsService, MdnsPacket, build_query_response, build_service_discovery_response}; use futures::prelude::*; use libp2p_core::{ - ConnectedPoint, Multiaddr, PeerId, address_translation, @@ -199,9 +198,9 @@ impl NetworkBehaviour for Mdns { .collect() } - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, _: &PeerId) {} fn inject_event( &mut self, diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 19ac80e4..802d5885 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-noise" description = "Cryptographic handshake protocol using the noise framework." -version = "0.16.2" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ edition = "2018" curve25519-dalek = "2.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4" prost = "0.6.1" rand = "0.7.2" @@ -28,7 +28,7 @@ snow = { version = "0.6.1", features = ["default-resolver"], default-features = [dev-dependencies] env_logger = "0.7.1" -libp2p-tcp = { version = "0.16.0", path = "../../transports/tcp" } +libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } quickcheck = "0.9.0" sodiumoxide = "^0.2.5" diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index dc6c292c..942ae48a 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-ping" edition = "2018" description = "Ping protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../../core" } -libp2p-swarm = { version = "0.16.0", path = "../../swarm" } +libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-swarm = { version = "0.17.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" @@ -20,7 +20,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.0" -libp2p-tcp = { version = "0.16.0", path = "../../transports/tcp" } -libp2p-secio = { version = "0.16.0", path = "../../protocols/secio" } -libp2p-yamux = { version = "0.16.0", path = "../../muxers/yamux" } +libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } +libp2p-secio = { version = "0.17.0", path = "../../protocols/secio" } +libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" } quickcheck = "0.9.0" diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 69126644..82b828ad 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -47,7 +47,7 @@ pub mod handler; pub use handler::{PingConfig, PingResult, PingSuccess, PingFailure}; use handler::PingHandler; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use std::{collections::VecDeque, task::Context, task::Poll}; use void::Void; @@ -100,9 +100,9 @@ impl NetworkBehaviour for Ping { Vec::new() } - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, _: &PeerId) {} fn inject_event(&mut self, peer: PeerId, _: ConnectionId, result: PingResult) { self.events.push_front(PingEvent { peer, result }) diff --git a/protocols/plaintext/Cargo.toml b/protocols/plaintext/Cargo.toml index e1935a29..b8df5dc8 100644 --- a/protocols/plaintext/Cargo.toml +++ b/protocols/plaintext/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-plaintext" edition = "2018" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] bytes = "0.5" futures = "0.3.1" futures_codec = "0.3.4" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4.8" prost = "0.6.1" rw-stream-sink = "0.2.0" diff --git a/protocols/pnet/Cargo.toml b/protocols/pnet/Cargo.toml index 0b5e69ad..ca55ac1f 100644 --- a/protocols/pnet/Cargo.toml +++ b/protocols/pnet/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-pnet" edition = "2018" description = "Private swarm support for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index 7874f807..66448676 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-secio" edition = "2018" description = "Secio encryption protocol for libp2p" -version = "0.16.1" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -16,7 +16,7 @@ ctr = "0.3" futures = "0.3.1" hmac = "0.7.0" lazy_static = "1.2.0" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4.6" prost = "0.6.1" pin-project = "0.4.6" @@ -48,8 +48,8 @@ aes-all = ["aesni"] [dev-dependencies] async-std = "1.0" criterion = "0.3" -libp2p-mplex = { version = "0.16.0", path = "../../muxers/mplex" } -libp2p-tcp = { version = "0.16.0", path = "../../transports/tcp" } +libp2p-mplex = { version = "0.17.0", path = "../../muxers/mplex" } +libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } [[bench]] name = "bench" diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 29bccee8..cb37d555 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-swarm" edition = "2018" description = "The libp2p swarm" -version = "0.16.1" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../core" } +libp2p-core = { version = "0.17.0", path = "../core" } log = "0.4" rand = "0.7" smallvec = "1.0" @@ -19,6 +19,6 @@ wasm-timer = "0.2" void = "1" [dev-dependencies] -libp2p-mplex = { version = "0.16.0", path = "../muxers/mplex" } +libp2p-mplex = { version = "0.17.0", path = "../muxers/mplex" } quickcheck = "0.9.0" rand = "0.7.2" diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 97497b8a..fdddc6a0 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -72,18 +72,34 @@ pub trait NetworkBehaviour: Send + 'static { /// address should be the most likely to be reachable. fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec; - /// Indicates the behaviour that we connected to the node with the given peer id through the - /// given endpoint. + /// Indicates the behaviour that we connected to the node with the given peer id. /// /// This node now has a handler (as spawned by `new_handler`) running in the background. - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint); + /// + /// This method is only called when the connection to the peer is + /// established, preceded by `inject_connection_established`. + fn inject_connected(&mut self, peer_id: &PeerId); - /// Indicates the behaviour that we disconnected from the node with the given peer id. The - /// endpoint is the one we used to be connected to. + /// Indicates the behaviour that we disconnected from the node with the given peer id. /// /// There is no handler running anymore for this node. Any event that has been sent to it may /// or may not have been processed by the handler. - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint); + /// + /// This method is only called when the last established connection to the peer + /// is closed, preceded by `inject_connection_closed`. + fn inject_disconnected(&mut self, peer_id: &PeerId); + + /// Informs the behaviour about a newly established connection to a peer. + fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) + {} + + /// Informs the behaviour about a closed connection to a peer. + /// + /// A call to this method is always paired with an earlier call to + /// `inject_connection_established` with the same peer ID, connection ID and + /// endpoint. + fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) + {} /// Informs the behaviour about an event generated by the handler dedicated to the peer identified by `peer_id`. /// for the behaviour. @@ -130,7 +146,7 @@ pub trait NetworkBehaviour: Send + 'static { } /// A listener closed. - fn inject_listener_closed(&mut self, _id: ListenerId) { + fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) { } /// Polls for things that swarm should do. @@ -204,6 +220,8 @@ pub enum NetworkBehaviourAction { DialPeer { /// The peer to try reach. peer_id: PeerId, + /// The condition for initiating a new dialing attempt. + condition: DialPeerCondition, }, /// Instructs the `Swarm` to send an event to the handler dedicated to a @@ -253,3 +271,36 @@ pub enum NotifyHandler { All } +/// The available conditions under which a new dialing attempt to +/// a peer is initiated when requested by [`NetworkBehaviourAction::DialPeer`]. +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub enum DialPeerCondition { + /// A new dialing attempt is initiated _only if_ the peer is currently + /// considered disconnected, i.e. there is no established connection + /// and no ongoing dialing attempt. + /// + /// If there is an ongoing dialing attempt, the addresses reported by + /// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing + /// dialing attempt, ignoring duplicates. + Disconnected, + /// A new dialing attempt is initiated _only if_ there is currently + /// no ongoing dialing attempt, i.e. the peer is either considered + /// disconnected or connected but without an ongoing dialing attempt. + /// + /// If there is an ongoing dialing attempt, the addresses reported by + /// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing + /// dialing attempt, ignoring duplicates. + /// + /// This condition implies [`DialPeerCondition::Disconnected`]. + NotDialing, + // TODO: Once multiple dialing attempts per peer are permitted. + // See https://github.com/libp2p/rust-libp2p/pull/1506. + // Always, +} + +impl Default for DialPeerCondition { + fn default() -> Self { + DialPeerCondition::Disconnected + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index b9a2e6dc..11bb1a05 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -65,7 +65,8 @@ pub use behaviour::{ NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, - NotifyHandler + NotifyHandler, + DialPeerCondition }; pub use protocols_handler::{ IntoProtocolsHandler, @@ -76,6 +77,7 @@ pub use protocols_handler::{ ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr, OneShotHandler, + OneShotHandlerConfig, SubstreamProtocol }; @@ -89,7 +91,6 @@ use futures::{ stream::FusedStream, }; use libp2p_core::{ - ConnectedPoint, Executor, Transport, Multiaddr, @@ -99,6 +100,8 @@ use libp2p_core::{ ConnectionError, ConnectionId, ConnectionInfo, + ConnectionLimit, + ConnectedPoint, EstablishedConnection, IntoConnectionHandler, ListenerId, @@ -108,7 +111,6 @@ use libp2p_core::{ transport::{TransportError, boxed::Boxed as BoxTransport}, muxing::{StreamMuxer, StreamMuxerBox}, network::{ - DialError, Network, NetworkInfo, NetworkEvent, @@ -201,12 +203,6 @@ pub enum SwarmEvent { /// Endpoint of the connection that has been closed. endpoint: ConnectedPoint, }, - /// Starting to try to reach the given peer. - /// - /// We are trying to connect to this peer until a [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) - /// event is reported, or a [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported - /// with `attempts_remaining` equal to 0. - Dialing(PeerId), /// Tried to dial an address but it ended up being unreachaable. UnreachableAddr { /// `PeerId` that we were trying to reach. @@ -246,6 +242,13 @@ pub enum SwarmEvent { /// The listener error. error: io::Error, }, + /// A new dialing attempt has been initiated. + /// + /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) + /// event is reported if the dialing attempt succeeds, otherwise a + /// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported + /// with `attempts_remaining` equal to 0. + Dialing(PeerId), } /// Contains the state of the network, plus the way it should behave. @@ -367,31 +370,65 @@ where TBehaviour: NetworkBehaviour, /// Tries to dial the given address. /// /// Returns an error if the address is not supported. - pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError> { + pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { let handler = me.behaviour.new_handler(); me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ()) } - /// Tries to reach the given peer using the elements in the topology. + /// Tries to initiate a dialing attempt to the given peer. /// - /// Has no effect if we are already connected to that peer, or if no address is known for the - /// peer. - pub fn dial(me: &mut Self, peer_id: PeerId) { - let addrs = me.behaviour.addresses_of_peer(&peer_id); + /// If a new dialing attempt has been initiated, `Ok(true)` is returned. + /// + /// If there is an ongoing dialing attempt, the current addresses of the + /// peer, as reported by [`NetworkBehaviour::addresses_of_peer`] are added + /// to the ongoing dialing attempt, ignoring duplicates. In this case no + /// new dialing attempt is initiated. + /// + /// If no new dialing attempt has been initiated, meaning there is an ongoing + /// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)` + /// is returned. + pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result { + let mut addrs = me.behaviour.addresses_of_peer(peer_id).into_iter(); match me.network.peer(peer_id.clone()) { Peer::Disconnected(peer) => { - let mut addrs = addrs.into_iter(); if let Some(first) = addrs.next() { let handler = me.behaviour.new_handler().into_node_handler_builder(); - if peer.connect(first, addrs, handler).is_err() { - me.behaviour.inject_dial_failure(&peer_id); + match peer.connect(first, addrs, handler) { + Ok(_) => return Ok(true), + Err(error) => { + log::debug!( + "New dialing attempt to disconnected peer {:?} failed: {:?}.", + peer_id, error); + me.behaviour.inject_dial_failure(&peer_id); + return Err(error) + } } } + Ok(false) }, + Peer::Connected(peer) => { + if let Some(first) = addrs.next() { + let handler = me.behaviour.new_handler().into_node_handler_builder(); + match peer.connect(first, addrs, handler) { + Ok(_) => return Ok(true), + Err(error) => { + log::debug!( + "New dialing attempt to connected peer {:?} failed: {:?}.", + peer_id, error); + me.behaviour.inject_dial_failure(&peer_id); + return Err(error) + } + } + } + Ok(false) + } Peer::Dialing(mut peer) => { - peer.connection().add_addresses(addrs) + peer.connection().add_addresses(addrs); + Ok(false) }, - Peer::Connected(_) | Peer::Local => {} + Peer::Local => { + Err(ConnectionLimit { current: 0, limit: 0 }) + } } } @@ -498,35 +535,29 @@ where TBehaviour: NetworkBehaviour, peer_id, endpoint, }); - } else if num_established.get() == 1 { - this.behaviour.inject_connected(peer_id.clone(), endpoint.clone()); - return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - }); } else { - // For now, secondary connections are not explicitly reported to - // the behaviour. A behaviour only gets awareness of the - // connections via the events emitted from the connection handlers. - log::trace!("Secondary connection established: {:?}; Total (peer): {}.", + log::debug!("Connection established: {:?}; Total (peer): {}.", connection.connected(), num_established); + let endpoint = connection.endpoint().clone(); + this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint); + if num_established.get() == 1 { + this.behaviour.inject_connected(&peer_id); + } return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, + peer_id, num_established, endpoint }); } }, - Poll::Ready(NetworkEvent::ConnectionError { connected, error, num_established }) => { - log::debug!("Connection {:?} closed by {:?}", connected, error); - let peer_id = connected.peer_id().clone(); + Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => { + log::debug!("Connection {:?} closed: {:?}", connected, error); + let info = connected.info; let endpoint = connected.endpoint; + this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint); if num_established == 0 { - this.behaviour.inject_disconnected(&peer_id, endpoint.clone()); + this.behaviour.inject_disconnected(info.peer_id()); } return Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, + peer_id: info.peer_id().clone(), endpoint, cause: error, num_established, @@ -563,7 +594,10 @@ where TBehaviour: NetworkBehaviour, for addr in addresses.iter() { this.behaviour.inject_expired_listen_addr(addr); } - this.behaviour.inject_listener_closed(listener_id); + this.behaviour.inject_listener_closed(listener_id, match &reason { + Ok(()) => Ok(()), + Err(err) => Err(err), + }); return Poll::Ready(SwarmEvent::ListenerClosed { addresses, reason, @@ -663,12 +697,40 @@ where TBehaviour: NetworkBehaviour, Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => { let _ = ExpandedSwarm::dial_addr(&mut *this, address); }, - Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => { + Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => { if this.banned_peers.contains(&peer_id) { this.behaviour.inject_dial_failure(&peer_id); } else { - ExpandedSwarm::dial(&mut *this, peer_id.clone()); - return Poll::Ready(SwarmEvent::Dialing(peer_id)) + let result = match condition { + DialPeerCondition::Disconnected + if this.network.is_disconnected(&peer_id) => + { + ExpandedSwarm::dial(this, &peer_id) + } + DialPeerCondition::NotDialing + if !this.network.is_dialing(&peer_id) => + { + ExpandedSwarm::dial(this, &peer_id) + } + _ => { + log::trace!("Condition for new dialing attempt to {:?} not met: {:?}", + peer_id, condition); + if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() { + let addrs = this.behaviour.addresses_of_peer(peer.id()); + peer.connection().add_addresses(addrs); + } + Ok(false) + } + }; + match result { + Ok(false) => {}, + Ok(true) => return Poll::Ready(SwarmEvent::Dialing(peer_id)), + Err(err) => { + log::debug!("Initiating dialing attempt to {:?} failed: {:?}", + &peer_id, err); + this.behaviour.inject_dial_failure(&peer_id); + } + } } }, Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => { @@ -922,28 +984,33 @@ impl<'a> PollParameters for SwarmPollParameters<'a> { } } +/// A `SwarmBuilder` provides an API for configuring and constructing a `Swarm`, +/// including the underlying [`Network`]. pub struct SwarmBuilder { local_peer_id: PeerId, transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>, behaviour: TBehaviour, - network: NetworkConfig, + network_config: NetworkConfig, } impl SwarmBuilder where TBehaviour: NetworkBehaviour, TConnInfo: ConnectionInfo + fmt::Debug + Clone + Send + 'static, { - pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self + /// Creates a new `SwarmBuilder` from the given transport, behaviour and + /// local peer ID. The `Swarm` with its underlying `Network` is obtained + /// via [`SwarmBuilder::build`]. + pub fn new(transport: TTrans, behaviour: TBehaviour, local_peer_id: PeerId) -> Self where TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, ::OutboundSubstream: Send + 'static, ::Substream: Send + 'static, - TTransport: Transport + Clone + Send + Sync + 'static, - TTransport::Error: Send + Sync + 'static, - TTransport::Listener: Send + 'static, - TTransport::ListenerUpgrade: Send + 'static, - TTransport::Dial: Send + 'static, + TTrans: Transport + Clone + Send + Sync + 'static, + TTrans::Error: Send + Sync + 'static, + TTrans::Listener: Send + 'static, + TTrans::ListenerUpgrade: Send + 'static, + TTrans::Dial: Send + 'static, { let transport = transport .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))) @@ -954,35 +1021,41 @@ where TBehaviour: NetworkBehaviour, local_peer_id, transport, behaviour, - network: NetworkConfig::default(), + network_config: Default::default(), } } - pub fn incoming_limit(mut self, incoming_limit: usize) -> Self { - self.network.set_pending_incoming_limit(incoming_limit); - self - } - - /// Sets the executor to use to spawn background tasks. + /// Configures the `Executor` to use for spawning background tasks. /// - /// By default, uses a threads pool. - pub fn executor(mut self, executor: impl Executor + Send + 'static) -> Self { - self.network.set_executor(Box::new(executor)); + /// By default, unless another executor has been configured, + /// [`SwarmBuilder::build`] will try to set up a `ThreadPool`. + pub fn executor(mut self, e: Box) -> Self { + self.network_config.set_executor(e); self } - /// Shortcut for calling `executor` with an object that calls the given closure. - pub fn executor_fn(mut self, executor: impl Fn(Pin + Send>>) + Send + 'static) -> Self { - struct SpawnImpl(F); - impl + Send>>)> Executor for SpawnImpl { - fn exec(&self, f: Pin + Send>>) { - (self.0)(f) - } - } - self.network.set_executor(Box::new(SpawnImpl(executor))); + /// Configures a limit for the number of simultaneous incoming + /// connection attempts. + pub fn incoming_connection_limit(mut self, n: usize) -> Self { + self.network_config.set_incoming_limit(n); self } + /// Configures a limit for the number of simultaneous outgoing + /// connection attempts. + pub fn outgoing_connection_limit(mut self, n: usize) -> Self { + self.network_config.set_outgoing_limit(n); + self + } + + /// Configures a limit for the number of simultaneous + /// established connections per peer. + pub fn peer_connection_limit(mut self, n: usize) -> Self { + self.network_config.set_established_per_peer_limit(n); + self + } + + /// Builds a `Swarm` with the current configuration. pub fn build(mut self) -> Swarm { let supported_protocols = self.behaviour .new_handler() @@ -992,9 +1065,10 @@ where TBehaviour: NetworkBehaviour, .map(|info| info.protocol_name().to_vec()) .collect(); - // If no executor has been explicitly configured, try to set up - // a thread pool. - if self.network.executor().is_none() { + let mut network_cfg = self.network_config; + + // If no executor has been explicitly configured, try to set up a thread pool. + if network_cfg.executor().is_none() { struct PoolWrapper(ThreadPool); impl Executor for PoolWrapper { fn exec(&self, f: Pin + Send>>) { @@ -1002,21 +1076,17 @@ where TBehaviour: NetworkBehaviour, } } - if let Some(executor) = ThreadPoolBuilder::new() - .name_prefix("libp2p-task-") + match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") .create() - .ok() .map(|tp| Box::new(PoolWrapper(tp)) as Box<_>) { - self.network.set_executor(Box::new(executor)); + Ok(executor) => { network_cfg.set_executor(Box::new(executor)); }, + Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err) } } - let network = Network::new( - self.transport, - self.local_peer_id, - self.network, - ); + let network = Network::new(self.transport, self.local_peer_id, network_cfg); ExpandedSwarm { network, @@ -1047,9 +1117,13 @@ impl NetworkBehaviour for DummyBehaviour { Vec::new() } - fn inject_connected(&mut self, _: PeerId, _: libp2p_core::ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} - fn inject_disconnected(&mut self, _: &PeerId, _: libp2p_core::ConnectedPoint) {} + fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {} + + fn inject_disconnected(&mut self, _: &PeerId) {} + + fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {} fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: ::OutEvent) {} @@ -1067,9 +1141,9 @@ impl NetworkBehaviour for DummyBehaviour { mod tests { use crate::{DummyBehaviour, SwarmBuilder}; use libp2p_core::{ - identity, PeerId, PublicKey, + identity, transport::dummy::{DummyStream, DummyTransport} }; use libp2p_mplex::Multiplex; @@ -1084,7 +1158,8 @@ mod tests { let transport = DummyTransport::<(PeerId, Multiplex)>::new(); let behaviour = DummyBehaviour {}; let swarm = SwarmBuilder::new(transport, behaviour, id.into()) - .incoming_limit(4).build(); + .incoming_connection_limit(4) + .build(); assert_eq!(swarm.network.incoming_limit(), Some(4)); } diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 16098918..95722ed9 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -63,7 +63,7 @@ pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; pub use node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; -pub use one_shot::OneShotHandler; +pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; /// A handler for a set of protocols used on a connection with a remote. diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 3191ca9e..8b045061 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -133,10 +133,10 @@ enum Shutdown { /// Error generated by the `NodeHandlerWrapper`. #[derive(Debug)] pub enum NodeHandlerWrapperError { - /// Error generated by the handler. + /// The connection handler encountered an error. Handler(TErr), - /// The connection has been deemed useless and has been closed. - UselessTimeout, + /// The connection keep-alive timeout expired. + KeepAliveTimeout, } impl From for NodeHandlerWrapperError { @@ -152,8 +152,8 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err), - NodeHandlerWrapperError::UselessTimeout => - write!(f, "Node has been closed due to inactivity"), + NodeHandlerWrapperError::KeepAliveTimeout => + write!(f, "Connection closed due to expired keep-alive timeout."), } } } @@ -165,7 +165,7 @@ where fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { NodeHandlerWrapperError::Handler(err) => Some(err), - NodeHandlerWrapperError::UselessTimeout => None, + NodeHandlerWrapperError::KeepAliveTimeout => None, } } } @@ -314,9 +314,9 @@ where if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { match self.shutdown { Shutdown::None => {}, - Shutdown::Asap => return Poll::Ready(Err(NodeHandlerWrapperError::UselessTimeout)), + Shutdown::Asap => return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)), Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { - Poll::Ready(_) => return Poll::Ready(Err(NodeHandlerWrapperError::UselessTimeout)), + Poll::Ready(_) => return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)), Poll::Pending => {} } } diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index dcb731ca..92d63088 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -54,8 +54,8 @@ where max_dial_negotiated: u32, /// Value to return from `connection_keep_alive`. keep_alive: KeepAlive, - /// After the given duration has elapsed, an inactive connection will shutdown. - inactive_timeout: Duration, + /// The configuration container for the handler + config: OneShotHandlerConfig, } impl @@ -67,7 +67,7 @@ where #[inline] pub fn new( listen_protocol: SubstreamProtocol, - inactive_timeout: Duration + config: OneShotHandlerConfig, ) -> Self { OneShotHandler { listen_protocol, @@ -77,7 +77,7 @@ where dial_negotiated: 0, max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, - inactive_timeout, + config } } @@ -121,7 +121,10 @@ where { #[inline] fn default() -> Self { - OneShotHandler::new(SubstreamProtocol::new(Default::default()), Duration::from_secs(10)) + OneShotHandler::new( + SubstreamProtocol::new(Default::default()), + OneShotHandlerConfig::default() + ) } } @@ -157,7 +160,7 @@ where ) { // If we're shutting down the connection for inactivity, reset the timeout. if !self.keep_alive.is_yes() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); + self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); } self.events_out.push(out.into()); @@ -172,7 +175,7 @@ where self.dial_negotiated -= 1; if self.dial_negotiated == 0 && self.dial_queue.is_empty() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); + self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); } self.events_out.push(out.into()); @@ -224,7 +227,8 @@ where self.dial_negotiated += 1; return Poll::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.dial_queue.remove(0)), + protocol: SubstreamProtocol::new(self.dial_queue.remove(0)) + .with_timeout(self.config.substream_timeout), info: (), }, ); @@ -236,3 +240,21 @@ where Poll::Pending } } + +/// Configuration parameters for the `OneShotHandler` +#[derive(Debug)] +pub struct OneShotHandlerConfig { + /// After the given duration has elapsed, an inactive connection will shutdown. + pub inactive_timeout: Duration, + /// Timeout duration for each newly opened outbound substream. + pub substream_timeout: Duration, +} + +impl Default for OneShotHandlerConfig { + fn default() -> Self { + let inactive_timeout = Duration::from_secs(10); + let substream_timeout = Duration::from_secs(10); + OneShotHandlerConfig { inactive_timeout, substream_timeout } + } +} + diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index f38e8b06..2ba58e95 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -76,15 +76,27 @@ where self.inner.as_mut().map(|b| b.addresses_of_peer(peer_id)).unwrap_or_else(Vec::new) } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + fn inject_connected(&mut self, peer_id: &PeerId) { if let Some(inner) = self.inner.as_mut() { - inner.inject_connected(peer_id, endpoint) + inner.inject_connected(peer_id) } } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + fn inject_disconnected(&mut self, peer_id: &PeerId) { if let Some(inner) = self.inner.as_mut() { - inner.inject_disconnected(peer_id, endpoint) + inner.inject_disconnected(peer_id) + } + } + + fn inject_connection_established(&mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint) { + if let Some(inner) = self.inner.as_mut() { + inner.inject_connection_established(peer_id, connection, endpoint) + } + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint) { + if let Some(inner) = self.inner.as_mut() { + inner.inject_connection_closed(peer_id, connection, endpoint) } } diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index b93f2c09..d476eac5 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-dns" edition = "2018" description = "DNS transport implementation for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,6 +10,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index b04330da..69d9063e 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-tcp" edition = "2018" description = "TCP/IP transport protocol for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -15,7 +15,7 @@ futures = "0.3.1" futures-timer = "3.0" get_if_addrs = "0.5.3" ipnet = "2.0.0" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4.1" tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true } diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 0b1b98d0..5068577c 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-uds" edition = "2018" description = "Unix domain sockets transport for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dependencies] async-std = { version = "1.0", optional = true } -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" tokio = { version = "0.2", default-features = false, features = ["uds"], optional = true } diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index 024d0b0d..299df214 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-wasm-ext" -version = "0.16.2" +version = "0.17.0" authors = ["Pierre Krieger "] edition = "2018" description = "Allows passing in an external transport in a WASM environment" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.19" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 6a0dfe2b..c1d4e03d 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-websocket" edition = "2018" description = "WebSocket transport for libp2p" -version = "0.16.0" +version = "0.17.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,14 +10,14 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-tls = "0.6" +async-tls = "0.7.0" bytes = "0.5" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.16.0", path = "../../core" } +libp2p-core = { version = "0.17.0", path = "../../core" } log = "0.4.8" quicksink = "0.1" -rustls = "0.16" +rustls = "0.17.0" rw-stream-sink = "0.2.0" soketto = { version = "0.3", features = ["deflate"] } url = "2.1" @@ -25,4 +25,4 @@ webpki = "0.21" webpki-roots = "0.18" [dev-dependencies] -libp2p-tcp = { version = "0.16.0", path = "../tcp" } +libp2p-tcp = { version = "0.17.0", path = "../tcp" } diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 830b645d..191636a0 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -308,11 +308,6 @@ where let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some"); trace!("starting TLS handshake with {}", address); let stream = self.tls_config.client.connect(&dns_name, stream) - .map_err(|e| { - // We should never enter here as we passed a `DNSNameRef` to `connect`. - debug!("invalid domain name: {:?}", dns_name); - Error::Tls(e.into()) - })? .map_err(|e| { debug!("TLS handshake with {} failed: {}", address, e); Error::Tls(tls::Error::from(e))