diff --git a/CHANGELOG.md b/CHANGELOG.md index 05e66be6..c49535ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,31 @@ # Version ??? +- `libp2p-kad`: Consider fixed (K_VALUE) amount of peers at closest query + initialization. Unless `KademliaConfig::set_replication_factor` is used change + has no effect. + [PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536) + +# Version 0.18.1 (2020-04-17) + +- `libp2p-swarm`: Make sure inject_dial_failure is called in all situations. + [PR 1549](https://github.com/libp2p/rust-libp2p/pull/1549) + +# Version 0.18.0 (2020-04-09) + +- `libp2p-core`: Treat connection limit errors as pending connection errors. + [PR 1546](https://github.com/libp2p/rust-libp2p/pull/1546) + +- `libp2p-core-derive`: Disambiguate calls to `NetworkBehaviour::inject_event`. + [PR 1543](https://github.com/libp2p/rust-libp2p/pull/1543) + +- `libp2p-floodsub`: Allow sent messages seen as subscribed. + [PR 1520](https://github.com/libp2p/rust-libp2p/pull/1520) + +- `libp2p-kad`: Return peers independent of record existence. + [PR 1544](https://github.com/libp2p/rust-libp2p/pull/1544) + +- `libp2p-wasm-ext`: Fix "parsed is null" errors being thrown. + [PR 1535](https://github.com/libp2p/rust-libp2p/pull/1535) # Version 0.17.0 (2020-04-02) diff --git a/Cargo.toml b/Cargo.toml index d5962524..a5bed3b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p" edition = "2018" description = "Peer-to-peer networking library" -version = "0.17.0" +version = "0.18.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -57,33 +57,33 @@ futures = "0.3.1" multiaddr = { package = "parity-multiaddr", version = "0.8.0", path = "misc/multiaddr" } multihash = "0.10" lazy_static = "1.2" -libp2p-mplex = { version = "0.17.0", path = "muxers/mplex", optional = true } -libp2p-identify = { version = "0.17.0", path = "protocols/identify", optional = true } -libp2p-kad = { version = "0.17.0", path = "protocols/kad", optional = true } -libp2p-floodsub = { version = "0.17.0", path = "protocols/floodsub", optional = true } -libp2p-gossipsub = { version = "0.17.0", path = "./protocols/gossipsub", optional = true } -libp2p-ping = { version = "0.17.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.17.0", path = "protocols/plaintext", optional = true } -libp2p-pnet = { version = "0.17.0", path = "protocols/pnet", optional = true } -libp2p-core = { version = "0.17.1", path = "core" } -libp2p-core-derive = { version = "0.17.0", path = "misc/core-derive" } -libp2p-secio = { version = "0.17.0", path = "protocols/secio", default-features = false, optional = true } -libp2p-swarm = { version = "0.17.0", path = "swarm" } -libp2p-uds = { version = "0.17.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.17.0", path = "transports/wasm-ext", optional = true } -libp2p-yamux = { version = "0.17.0", path = "muxers/yamux", optional = true } -libp2p-noise = { version = "0.17.0", path = "protocols/noise", optional = true } +libp2p-mplex = { version = "0.18.0", path = "muxers/mplex", optional = true } +libp2p-identify = { version = "0.18.0", path = "protocols/identify", optional = true } +libp2p-kad = { version = "0.18.0", path = "protocols/kad", optional = true } +libp2p-floodsub = { version = "0.18.0", path = "protocols/floodsub", optional = true } +libp2p-gossipsub = { version = "0.18.0", path = "./protocols/gossipsub", optional = true } +libp2p-ping = { version = "0.18.0", path = "protocols/ping", optional = true } +libp2p-plaintext = { version = "0.18.0", path = "protocols/plaintext", optional = true } +libp2p-pnet = { version = "0.18.0", path = "protocols/pnet", optional = true } +libp2p-core = { version = "0.18.0", path = "core" } +libp2p-core-derive = { version = "0.18.0", path = "misc/core-derive" } +libp2p-secio = { version = "0.18.0", path = "protocols/secio", default-features = false, optional = true } +libp2p-swarm = { version = "0.18.1", path = "swarm" } +libp2p-uds = { version = "0.18.0", path = "transports/uds", optional = true } +libp2p-wasm-ext = { version = "0.18.0", path = "transports/wasm-ext", optional = true } +libp2p-yamux = { version = "0.18.0", path = "muxers/yamux", optional = true } +libp2p-noise = { version = "0.18.0", path = "protocols/noise", optional = true } parking_lot = "0.10.0" pin-project = "0.4.6" smallvec = "1.0" wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.17.0", path = "protocols/deflate", optional = true } -libp2p-dns = { version = "0.17.0", path = "transports/dns", optional = true } -libp2p-mdns = { version = "0.17.0", path = "protocols/mdns", optional = true } -libp2p-tcp = { version = "0.17.0", path = "transports/tcp", optional = true } -libp2p-websocket = { version = "0.17.0", path = "transports/websocket", optional = true } +libp2p-deflate = { version = "0.18.0", path = "protocols/deflate", optional = true } +libp2p-dns = { version = "0.18.0", path = "transports/dns", optional = true } +libp2p-mdns = { version = "0.18.0", path = "protocols/mdns", optional = true } +libp2p-tcp = { version = "0.18.0", path = "transports/tcp", optional = true } +libp2p-websocket = { version = "0.18.0", path = "transports/websocket", optional = true } [dev-dependencies] async-std = "1.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 47f60c04..dd7f0461 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-core" edition = "2018" description = "Core traits and structs of libp2p" -version = "0.17.1" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -40,9 +40,9 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal [dev-dependencies] async-std = "1.0" -libp2p-mplex = { version = "0.17.0", path = "../muxers/mplex" } -libp2p-secio = { version = "0.17.0", path = "../protocols/secio" } -libp2p-tcp = { version = "0.17.0", path = "../transports/tcp" } +libp2p-mplex = { version = "0.18.0", path = "../muxers/mplex" } +libp2p-secio = { version = "0.18.0", path = "../protocols/secio" } +libp2p-tcp = { version = "0.18.0", path = "../transports/tcp" } quickcheck = "0.9.0" wasm-timer = "0.2" diff --git a/core/src/connection/error.rs b/core/src/connection/error.rs index 0d291c37..1836965e 100644 --- a/core/src/connection/error.rs +++ b/core/src/connection/error.rs @@ -29,10 +29,6 @@ pub enum ConnectionError { // TODO: Eventually this should also be a custom error? IO(io::Error), - /// The connection was dropped because the connection limit - /// for a peer has been reached. - ConnectionLimit(ConnectionLimit), - /// The connection handler produced an error. Handler(THandlerErr), } @@ -48,8 +44,6 @@ where write!(f, "Connection error: I/O error: {}", err), ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), - ConnectionError::ConnectionLimit(l) => - write!(f, "Connection error: Connection limit: {}.", l) } } } @@ -63,7 +57,6 @@ where match self { ConnectionError::IO(err) => Some(err), ConnectionError::Handler(err) => Some(err), - ConnectionError::ConnectionLimit(..) => None, } } } @@ -78,6 +71,10 @@ pub enum PendingConnectionError { /// match the one that was expected or is otherwise invalid. InvalidPeerId, + /// The connection was dropped because the connection limit + /// for a peer has been reached. + ConnectionLimit(ConnectionLimit), + /// An I/O error occurred on the connection. // TODO: Eventually this should also be a custom error? IO(io::Error), @@ -96,6 +93,8 @@ where write!(f, "Pending connection: Transport error: {}", err), PendingConnectionError::InvalidPeerId => write!(f, "Pending connection: Invalid peer ID."), + PendingConnectionError::ConnectionLimit(l) => + write!(f, "Connection error: Connection limit: {}.", l), } } } @@ -110,6 +109,7 @@ where PendingConnectionError::IO(err) => Some(err), PendingConnectionError::Transport(err) => Some(err), PendingConnectionError::InvalidPeerId => None, + PendingConnectionError::ConnectionLimit(..) => None, } } } diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 56e11236..f8c9e1a7 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -112,7 +112,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC error: PendingConnectionError, /// The handler that was supposed to handle the connection, /// if the connection failed before the handler was consumed. - handler: THandler, + handler: Option, /// The (expected) peer of the failed connection. peer: Option, /// A reference to the pool that managed the connection. @@ -558,7 +558,7 @@ where id, endpoint, error, - handler, + handler: Some(handler), peer, pool: self }) @@ -588,13 +588,13 @@ where .map_or(0, |conns| conns.len()); if let Err(e) = self.limits.check_established(current) { let connected = entry.close(); - let num_established = u32::try_from(e.current).unwrap(); - return Poll::Ready(PoolEvent::ConnectionError { + return Poll::Ready(PoolEvent::PendingConnectionError { id, - connected, - error: ConnectionError::ConnectionLimit(e), - num_established, - pool: self, + endpoint: connected.endpoint, + error: PendingConnectionError::ConnectionLimit(e), + handler: None, + peer, + pool: self }) } // Peer ID checks must already have happened. See `add_pending`. diff --git a/core/src/network.rs b/core/src/network.rs index 10b6e063..ccb21e84 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -512,7 +512,7 @@ fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TP id: ConnectionId, endpoint: ConnectedPoint, error: PendingConnectionError, - handler: THandler, + handler: Option, ) -> (Option>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>) where TTrans: Transport, @@ -533,22 +533,29 @@ where let num_remain = u32::try_from(attempt.next.len()).unwrap(); let failed_addr = attempt.current.clone(); - let opts = + let (opts, attempts_remaining) = if num_remain > 0 { - let next_attempt = attempt.next.remove(0); - let opts = DialingOpts { - peer: peer_id.clone(), - handler, - address: next_attempt, - remaining: attempt.next - }; - Some(opts) + if let Some(handler) = handler { + let next_attempt = attempt.next.remove(0); + let opts = DialingOpts { + peer: peer_id.clone(), + handler, + address: next_attempt, + remaining: attempt.next + }; + (Some(opts), num_remain) + } else { + // The error is "fatal" for the dialing attempt, since + // the handler was already consumed. All potential + // remaining connection attempts are thus void. + (None, 0) + } } else { - None + (None, 0) }; (opts, NetworkEvent::DialError { - attempts_remaining: num_remain, + attempts_remaining, peer_id, multiaddr: failed_addr, error, @@ -560,7 +567,6 @@ where (None, NetworkEvent::UnknownPeerDialError { multiaddr: address, error, - handler, }), ConnectedPoint::Listener { local_addr, send_back_addr } => (None, NetworkEvent::IncomingConnectionError { diff --git a/core/src/network/event.rs b/core/src/network/event.rs index afbedcf7..a63dc479 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -146,10 +146,6 @@ where /// The error that happened. error: PendingConnectionError, - - /// The handler that was passed to `dial()`, if the - /// connection failed before the handler was consumed. - handler: THandler, }, /// An established connection produced an event. diff --git a/misc/core-derive/Cargo.toml b/misc/core-derive/Cargo.toml index b181d745..166156cd 100644 --- a/misc/core-derive/Cargo.toml +++ b/misc/core-derive/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-core-derive" edition = "2018" description = "Procedural macros of libp2p-core" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,4 +17,4 @@ syn = { version = "1.0.8", default-features = false, features = ["clone-impls", quote = "1.0" [dev-dependencies] -libp2p = { version = "0.17.0", path = "../.." } +libp2p = { version = "0.18.0", path = "../.." } diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 1153276f..d7d28f15 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -293,8 +293,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } Some(match field.ident { - Some(ref i) => quote!{ #elem => self.#i.inject_event(peer_id, connection_id, ev) }, - None => quote!{ #elem => self.#field_n.inject_event(peer_id, connection_id, ev) }, + Some(ref i) => quote!{ #elem => #trait_to_impl::inject_event(&mut self.#i, peer_id, connection_id, ev) }, + None => quote!{ #elem => #trait_to_impl::inject_event(&mut self.#field_n, peer_id, connection_id, ev) }, }) }); diff --git a/misc/core-derive/tests/test.rs b/misc/core-derive/tests/test.rs index e0c5cfdf..b65fb16e 100644 --- a/misc/core-derive/tests/test.rs +++ b/misc/core-derive/tests/test.rs @@ -204,3 +204,35 @@ fn where_clause() { bar: T, } } + +#[test] +fn nested_derives_with_import() { + use libp2p::swarm::NetworkBehaviourEventProcess; + + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + struct Foo { + ping: libp2p::ping::Ping, + } + + #[allow(dead_code)] + #[derive(NetworkBehaviour)] + struct Bar { + foo: Foo, + } + + impl NetworkBehaviourEventProcess for Foo { + fn inject_event(&mut self, _: libp2p::ping::PingEvent) { + } + } + + impl NetworkBehaviourEventProcess<()> for Bar { + fn inject_event(&mut self, _: ()) { + } + } + + #[allow(dead_code)] + fn bar() { + require_net_behaviour::(); + } +} diff --git a/misc/peer-id-generator/Cargo.toml b/misc/peer-id-generator/Cargo.toml index aa6d19b6..f3efec74 100644 --- a/misc/peer-id-generator/Cargo.toml +++ b/misc/peer-id-generator/Cargo.toml @@ -11,5 +11,5 @@ categories = ["network-programming", "asynchronous"] publish = false [dependencies] -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } num_cpus = "1.8" diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index b661e38f..4be96b7e 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mplex" edition = "2018" description = "Mplex multiplexing protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,11 +14,11 @@ bytes = "0.5" fnv = "1.0" futures = "0.3.1" futures_codec = "0.3.4" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4" parking_lot = "0.10" unsigned-varint = { version = "0.3", features = ["futures-codec"] } [dev-dependencies] async-std = "1.0" -libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } +libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" } diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index a79d49f7..4255bea4 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-yamux" edition = "2018" description = "Yamux multiplexing protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } parking_lot = "0.10" thiserror = "1.0" yamux = "0.4.5" diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml index c4aa2262..0d2552f5 100644 --- a/protocols/deflate/Cargo.toml +++ b/protocols/deflate/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-deflate" edition = "2018" description = "Deflate encryption protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,11 +11,11 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } flate2 = "1.0" [dev-dependencies] async-std = "1.0" -libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } +libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" } rand = "0.7" quickcheck = "0.9" diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index f1ea3a0b..4c8fa279 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-floodsub" edition = "2018" description = "Floodsub protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.3.2" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../../core" } -libp2p-swarm = { version = "0.17.0", path = "../../swarm" } +libp2p-core = { version = "0.18.0", path = "../../core" } +libp2p-swarm = { version = "0.18.0", path = "../../swarm" } prost = "0.6.1" rand = "0.7" smallvec = "1.0" diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 1c837b2d..4bb6aa08 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -18,8 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; +use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::topic::Topic; +use crate::FloodsubConfig; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; @@ -43,8 +44,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - /// Peer id of the local node. Used for the source of the messages that we publish. - local_peer_id: PeerId, + config: FloodsubConfig, /// List of peers to send messages to. target_peers: FnvHashSet, @@ -64,11 +64,16 @@ pub struct Floodsub { } impl Floodsub { - /// Creates a `Floodsub`. + /// Creates a `Floodsub` with default configuration. pub fn new(local_peer_id: PeerId) -> Self { + Self::from_config(FloodsubConfig::new(local_peer_id)) + } + + /// Creates a `Floodsub` with the given configuration. + pub fn from_config(config: FloodsubConfig) -> Self { Floodsub { events: VecDeque::new(), - local_peer_id, + config, target_peers: FnvHashSet::default(), connected_peers: HashMap::new(), subscribed_topics: SmallVec::new(), @@ -190,7 +195,7 @@ impl Floodsub { fn publish_many_inner(&mut self, topic: impl IntoIterator>, data: impl Into>, check_self_subscriptions: bool) { let message = FloodsubMessage { - source: self.local_peer_id.clone(), + source: self.config.local_peer_id.clone(), data: data.into(), // If the sequence numbers are predictable, then an attacker could flood the network // with packets with the predetermined sequence numbers and absorb our legitimate @@ -202,6 +207,10 @@ impl Floodsub { let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)); if self_subscribed { self.received.add(&message); + if self.config.subscribe_local_messages { + self.events.push_back( + NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone()))); + } } // Don't publish the message if we have to check subscriptions // and we're not subscribed ourselves to any of the topics. @@ -228,7 +237,7 @@ impl Floodsub { } impl NetworkBehaviour for Floodsub { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index cc9e840a..8e7014be 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -21,6 +21,8 @@ //! Implements the floodsub protocol, see also the: //! [spec](https://github.com/libp2p/specs/tree/master/pubsub). +use libp2p_core::PeerId; + pub mod protocol; mod layer; @@ -33,3 +35,22 @@ mod rpc_proto { pub use self::layer::{Floodsub, FloodsubEvent}; pub use self::protocol::{FloodsubMessage, FloodsubRpc}; pub use self::topic::Topic; + +/// Configuration options for the Floodsub protocol. +pub struct FloodsubConfig { + /// Peer id of the local node. Used for the source of the messages that we publish. + pub local_peer_id: PeerId, + + /// `true` if messages published by local node should be propagated as messages received from + /// the network, `false` by default. + pub subscribe_local_messages: bool, +} + +impl FloodsubConfig { + pub fn new(local_peer_id: PeerId) -> Self { + Self { + local_peer_id, + subscribe_local_messages: false + } + } +} diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 4df3975e..046c72d8 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -27,16 +27,16 @@ use futures::{Future, io::{AsyncRead, AsyncWrite}}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] -pub struct FloodsubConfig {} +pub struct FloodsubProtocol {} -impl FloodsubConfig { - /// Builds a new `FloodsubConfig`. - pub fn new() -> FloodsubConfig { - FloodsubConfig {} +impl FloodsubProtocol { + /// Builds a new `FloodsubProtocol`. + pub fn new() -> FloodsubProtocol { + FloodsubProtocol {} } } -impl UpgradeInfo for FloodsubConfig { +impl UpgradeInfo for FloodsubProtocol { type Info = &'static [u8]; type InfoIter = iter::Once; @@ -45,7 +45,7 @@ impl UpgradeInfo for FloodsubConfig { } } -impl InboundUpgrade for FloodsubConfig +impl InboundUpgrade for FloodsubProtocol where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 8826b9b1..8f1b297d 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-gossipsub" edition = "2018" description = "Gossipsub protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,8 +10,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.17.0", path = "../../swarm" } -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-swarm = { version = "0.18.0", path = "../../swarm" } +libp2p-core = { version = "0.18.0", path = "../../core" } bytes = "0.5.4" byteorder = "1.3.2" fnv = "1.0.6" @@ -30,8 +30,8 @@ prost = "0.6.1" [dev-dependencies] async-std = "1.4.0" env_logger = "0.7.1" -libp2p-plaintext = { version = "0.17.0", path = "../plaintext" } -libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" } +libp2p-plaintext = { version = "0.18.0", path = "../plaintext" } +libp2p-yamux = { version = "0.18.0", path = "../../muxers/yamux" } quickcheck = "0.9.2" [build-dependencies] diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 07aa7500..31fb5e9a 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-identify" edition = "2018" description = "Nodes identifcation protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../../core" } -libp2p-swarm = { version = "0.17.0", path = "../../swarm" } +libp2p-core = { version = "0.18.0", path = "../../core" } +libp2p-swarm = { version = "0.18.0", path = "../../swarm" } log = "0.4.1" prost = "0.6.1" smallvec = "1.0" @@ -20,9 +20,9 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.0" -libp2p-mplex = { version = "0.17.0", path = "../../muxers/mplex" } -libp2p-secio = { version = "0.17.0", path = "../../protocols/secio" } -libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } +libp2p-mplex = { version = "0.18.0", path = "../../muxers/mplex" } +libp2p-secio = { version = "0.18.0", path = "../../protocols/secio" } +libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" } [build-dependencies] prost-build = "0.6" diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 6e914a90..ede25120 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-kad" edition = "2018" description = "Kademlia protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,8 +17,8 @@ fnv = "1.0" futures_codec = "0.3.4" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.17.0", path = "../../core" } -libp2p-swarm = { version = "0.17.0", path = "../../swarm" } +libp2p-core = { version = "0.18.0", path = "../../core" } +libp2p-swarm = { version = "0.18.0", path = "../../swarm" } multihash = "0.10" prost = "0.6.1" rand = "0.7.2" @@ -34,8 +34,8 @@ derivative = "2.0.2" trust-graph = { git = "ssh://git@github.com/fluencelabs/arqada.git", branch = "master" } [dev-dependencies] -libp2p-secio = { version = "0.17.0", path = "../secio" } -libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" } +libp2p-secio = { version = "0.18.0", path = "../secio" } +libp2p-yamux = { version = "0.18.0", path = "../../muxers/yamux" } quickcheck = "0.9.0" [build-dependencies] diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 1009b52d..8f467408 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1426,13 +1426,7 @@ where None => None }; - // If no record is found, at least report known closer peers. - let closer_peers = - if record.is_none() { - self.find_closest(&kbucket::Key::new(key), &source) - } else { - Vec::new() - }; + let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source, diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index fcaa6875..7ad85b96 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -22,7 +22,7 @@ use super::*; -use crate::K_VALUE; +use crate::{ALPHA_VALUE, K_VALUE}; use crate::kbucket::Distance; use crate::record::store::MemoryStore; use futures::{ @@ -35,7 +35,7 @@ use libp2p_core::{ Transport, identity, transport::MemoryTransport, - multiaddr::{Protocol, multiaddr}, + multiaddr::{Protocol, Multiaddr, multiaddr}, muxing::StreamMuxerBox, upgrade }; @@ -50,64 +50,86 @@ use libp2p_core::identity::ed25519; type TestSwarm = Swarm>; +fn build_node() -> (Multiaddr, TestSwarm) { + build_node_with_config(Default::default()) +} + +fn build_node_with_config(cfg: KademliaConfig) -> (Ed25519::Keypair, Multiaddr, TestSwarm) { + let ed25519_key = ed25519::Keypair::generate(); + let local_key = identity::Keypair::Ed25519(ed25519_key.clone()); + let local_public_key = local_key.public(); + let transport = MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(SecioConfig::new(local_key)) + .multiplex(yamux::Config::default()) + .map(|(p, m), _| (p, StreamMuxerBox::new(m))) + .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) + .boxed(); + + let local_id = local_public_key.clone().into_peer_id(); + let store = MemoryStore::new(local_id.clone()); + let trust = TrustGraph::new(Vec::new()); + let behaviour = Kademlia::with_config(ed25519_key.clone(), local_id.clone(), store, cfg.clone(), trust); + + let mut swarm = Swarm::new(transport, behaviour, local_id); + + let address: Multiaddr = Protocol::Memory(random::()).into(); + Swarm::listen_on(&mut swarm, address.clone()).unwrap(); + + (ed25519_key, address, swarm) +} + /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes(num: usize) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) { +fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> { build_nodes_with_config(num, Default::default()) } /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) { - let port_base = 1 + random::() % (u64::MAX - num as u64); - let mut result: Vec<(ed25519::Keypair, Swarm<_, _>)> = Vec::with_capacity(num); - - for _ in 0 .. num { - let ed25519_key = ed25519::Keypair::generate(); - let local_key = identity::Keypair::Ed25519(ed25519_key.clone()); - let local_public_key = local_key.public(); - let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) - .authenticate(SecioConfig::new(local_key)) - .multiplex(yamux::Config::default()) - .map(|(p, m), _| (p, StreamMuxerBox::new(m))) - .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) - .boxed(); - - let local_id = local_public_key.clone().into_peer_id(); - let store = MemoryStore::new(local_id.clone()); - let trust = TrustGraph::new(Vec::new()); - let behaviour = Kademlia::with_config(ed25519_key.clone(), local_id.clone(), store, cfg.clone(), trust); - result.push((ed25519_key, Swarm::new(transport, behaviour, local_id))); - } - - for (i, (_, s)) in result.iter_mut().enumerate() { - Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap(); - } - - (port_base, result) +fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> { + (0..num).map(|_| build_node_with_config(cfg.clone())).collect() } -fn build_connected_nodes(total: usize, step: usize) -> (Vec, Vec<(ed25519::Keypair, TestSwarm)>) { +fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm)> { build_connected_nodes_with_config(total, step, Default::default()) } fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig) - -> (Vec, Vec<(ed25519::Keypair, TestSwarm)>) + -> Vec<(Multiaddr, TestSwarm)> { - let (port_base, mut swarms) = build_nodes_with_config(total, cfg); - let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect(); + let mut swarms = build_nodes_with_config(total, cfg); + let swarm_ids: Vec<_> = swarms.iter() + .map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone())) + .collect(); let mut i = 0; - for (j, peer) in swarm_ids.iter().enumerate().skip(1) { + for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) { if i < swarm_ids.len() { let public = swarms[i].0.public(); - swarms[i].1.add_address(&peer, Protocol::Memory(port_base + j as u64).into(), public); + swarms[i].1.add_address(peer_id, addr.clone(), public); } if j % step == 0 { i += step; } } - (swarm_ids, swarms) + swarms +} + +fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig) + -> Vec<(Multiaddr, TestSwarm)> +{ + let mut swarms = build_nodes_with_config(total, cfg); + let swarm_addr_and_peer_id: Vec<_> = swarms.iter() + .map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone())) + .collect(); + + for (_addr, swarm) in swarms.iter_mut() { + for (addr, peer) in &swarm_addr_and_peer_id { + swarm.add_address(&peer, addr.clone()); + } + } + + swarms } fn random_multihash() -> Multihash { @@ -118,8 +140,17 @@ fn random_multihash() -> Multihash { fn bootstrap() { fn run(rng: &mut impl Rng) { let num_total = rng.gen_range(2, 20); - let num_group = rng.gen_range(1, num_total); - let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group); + // When looking for the closest node to a key, Kademlia considers ALPHA_VALUE nodes to query + // at initialization. If `num_groups` is larger than ALPHA_VALUE the remaining locally known + // nodes will not be considered. Given that no other node is aware of them, they would be + // lost entirely. To prevent the above restrict `num_groups` to be equal or smaller than + // ALPHA_VALUE. + let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2); + + let mut swarms = build_connected_nodes(num_total, num_group).into_iter() + .map(|(_a, s)| s) + .collect::>(); + let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); swarms[0].1.bootstrap(); @@ -170,7 +201,10 @@ fn query_iter() { fn run(rng: &mut impl Rng) { let num_total = rng.gen_range(2, 20); - let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1); + let mut swarms = build_connected_nodes(num_total, 1).into_iter() + .map(|(_a, s)| s) + .collect::>(); + let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); // Ask the first peer in the list to search a random peer. The search should // propagate forwards through the list of peers. @@ -222,7 +256,9 @@ fn unresponsive_not_returned_direct() { // Build one node. It contains fake addresses to non-existing nodes. We ask it to find a // random peer. We make sure that no fake address is returned. - let (_, mut swarms) = build_nodes(1); + let mut swarms = build_nodes(1).into_iter() + .map(|(_a, s)| s) + .collect::>(); // Add fake addresses. for _ in 0 .. 10 { @@ -263,18 +299,22 @@ fn unresponsive_not_returned_indirect() { // non-existing nodes. We ask node #2 to find a random peer. We make sure that no fake address // is returned. - let (port_base, mut swarms) = build_nodes(2); + let mut swarms = build_nodes(2); // Add fake addresses to first. - let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone(); for _ in 0 .. 10 { let public0 = swarms[0].0.public(); - swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)], public0); + swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]); } // Connect second to first. + let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone(); + let first_address = swarms[0].0.clone(); let public1 = swarms[1].0.public(); - swarms[1].1.add_address(&first_peer_id, Protocol::Memory(port_base).into(), public1); + swarms[1].1.add_address(&first_peer_id, first_address); + + // Drop the swarm addresses. + let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); // Ask second to search a random value. let search_target = PeerId::random(); @@ -306,14 +346,21 @@ fn unresponsive_not_returned_indirect() { #[test] fn get_record_not_found() { - let (port_base, mut swarms) = build_nodes(3); + let mut swarms = build_nodes(3); - let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect(); + let swarm_ids: Vec<_> = swarms.iter() + .map(|(_addr, swarm)| Swarm::local_peer_id(swarm)) + .cloned() + .collect(); let public0 = swarms[0].0.public(); - swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0); let public1 = swarms[1].0.public(); - swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1); + let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone()); + swarms[0].1.add_address(&swarm_ids[1], second); + swarms[1].1.add_address(&swarm_ids[2], third); + + // Drop the swarm addresses. + let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); let target_key = record::Key::from(random_multihash()); swarms[0].1.get_record(&target_key, Quorum::One); @@ -347,16 +394,35 @@ fn get_record_not_found() { ) } +/// A node joining a fully connected network via a single bootnode should be able to put a record to +/// the X closest nodes of the network where X is equal to the configured replication factor. #[test] fn put_record() { fn prop(replication_factor: usize, records: Vec) { let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); let num_total = replication_factor.get() * 2; - let num_group = replication_factor.get(); let mut config = KademliaConfig::default(); config.set_replication_factor(replication_factor); - let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config); + + let mut swarms = { + let mut fully_connected_swarms = build_fully_connected_nodes_with_config( + num_total - 1, + config.clone(), + ); + + let mut single_swarm = build_node_with_config(config); + single_swarm.1.add_address( + Swarm::local_peer_id(&fully_connected_swarms[0].1), + fully_connected_swarms[0].0.clone(), + ); + + let mut swarms = vec![single_swarm]; + swarms.append(&mut fully_connected_swarms); + + // Drop the swarm addresses. + swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>() + }; let records = records.into_iter() .take(num_total) @@ -387,7 +453,7 @@ fn put_record() { Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) | Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => { match res { - Err(e) => panic!(e), + Err(e) => panic!("{:?}", e), Ok(ok) => { assert!(records.contains_key(&ok.key)); let record = swarm.store.get(&ok.key).unwrap(); @@ -417,10 +483,14 @@ fn put_record() { assert_eq!(r.key, expected.key); assert_eq!(r.value, expected.value); assert_eq!(r.expires, expected.expires); - assert_eq!(r.publisher.as_ref(), Some(&swarm_ids[0])); + assert_eq!(r.publisher.as_ref(), Some(Swarm::local_peer_id(&swarms[0]))); let key = kbucket::Key::new(r.key.clone()); - let mut expected = swarm_ids.clone().split_off(1); + let mut expected = swarms.iter() + .skip(1) + .map(Swarm::local_peer_id) + .cloned() + .collect::>(); expected.sort_by(|id1, id2| kbucket::Key::new(id1.clone()).distance(&key).cmp( &kbucket::Key::new(id2.clone()).distance(&key))); @@ -430,17 +500,32 @@ fn put_record() { .take(replication_factor.get()) .collect::>(); - let actual = swarms.iter().enumerate().skip(1) - .filter_map(|(i, (_, s))| - if s.store.get(key.preimage()).is_some() { - Some(swarm_ids[i].clone()) + let actual = swarms.iter() + .skip(1) + .filter_map(|(_, swarm)| + if swarm.store.get(key.preimage()).is_some() { + Some(Swarm::local_peer_id(swarm).clone()) } else { None }) .collect::>(); assert_eq!(actual.len(), replication_factor.get()); - assert_eq!(actual, expected); + + let actual_not_expected = actual.difference(&expected) + .collect::>(); + assert!( + actual_not_expected.is_empty(), + "Did not expect records to be stored on nodes {:?}.", + actual_not_expected, + ); + + let expected_not_actual = expected.difference(&actual) + .collect::>(); + assert!(expected_not_actual.is_empty(), + "Expected record to be stored on nodes {:?}.", + expected_not_actual, + ); } if republished { @@ -461,22 +546,27 @@ fn put_record() { ) } - QuickCheck::new().tests(3).quickcheck(prop as fn(_,_)) + QuickCheck::new().tests(3).quickcheck(prop as fn(_,_) -> _) } #[test] fn get_value() { - let (port_base, mut swarms) = build_nodes(3); + let mut swarms = build_nodes(3); - let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect(); + // Let first peer know of second peer and second peer know of third peer. + for i in 0..2 { + let (peer_id, address) = (|(_, s)| s).map(Swarm::local_peer_id(&swarms[i+1].1).clone(), swarms[i+1].0.clone()); + swarms[i].1.add_address(&peer_id, address); + } - let public0 = swarms[0].0.public(); - swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0); - let public1 = swarms[1].0.public(); - swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1); + // Drop the swarm addresses. + let mut swarms = swarms.into_iter().map(|(_key, _addr, swarm)| swarm).collect::>(); let record = Record::new(random_multihash(), vec![4,5,6]); + let public0 = swarms[0].0.public(); + let public1 = swarms[1].0.public(); + swarms[1].1.store.put(record.clone()).unwrap(); swarms[0].1.get_record(&record.key, Quorum::One); @@ -507,7 +597,9 @@ fn get_value() { fn get_value_many() { // TODO: Randomise let num_nodes = 12; - let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes); + let mut swarms = build_connected_nodes(num_nodes, 3).into_iter() + .map(|(_addr, swarm)| swarm) + .collect::>(); let num_results = 10; let record = Record::new(random_multihash(), vec![4,5,6]); @@ -541,17 +633,36 @@ fn get_value_many() { ) } +/// A node joining a fully connected network via a single bootnode should be able to add itself as a +/// provider to the X closest nodes of the network where X is equal to the configured replication +/// factor. #[test] fn add_provider() { fn prop(replication_factor: usize, keys: Vec) { let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); let num_total = replication_factor.get() * 2; - let num_group = replication_factor.get(); let mut config = KademliaConfig::default(); config.set_replication_factor(replication_factor); - let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config); + let mut swarms = { + let mut fully_connected_swarms = build_fully_connected_nodes_with_config( + num_total - 1, + config.clone(), + ); + + let mut single_swarm = build_node_with_config(config); + single_swarm.1.add_address( + Swarm::local_peer_id(&fully_connected_swarms[0].1), + fully_connected_swarms[0].0.clone(), + ); + + let mut swarms = vec![single_swarm]; + swarms.append(&mut fully_connected_swarms); + + // Drop addresses before returning. + swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>() + }; let keys: HashSet<_> = keys.into_iter().take(num_total).collect(); @@ -605,10 +716,10 @@ fn add_provider() { // each key was published to the `replication_factor` closest peers. while let Some(key) = results.pop() { // Collect the nodes that have a provider record for `key`. - let actual = swarms.iter().enumerate().skip(1) - .filter_map(|(i, (_, s))| - if s.store.providers(&key).len() == 1 { - Some(swarm_ids[i].clone()) + let actual = swarms.iter().skip(1) + .filter_map(|(_, swarm)| + if swarm.store.providers(&key).len() == 1 { + Some(Swarm::local_peer_id(&swarm).clone()) } else { None }) @@ -620,7 +731,11 @@ fn add_provider() { return Poll::Pending } - let mut expected = swarm_ids.clone().split_off(1); + let mut expected = swarms.iter() + .skip(1) + .map(Swarm::local_peer_id) + .cloned() + .collect::>(); let kbucket_key = kbucket::Key::new(key); expected.sort_by(|id1, id2| kbucket::Key::new(id1.clone()).distance(&kbucket_key).cmp( @@ -636,8 +751,8 @@ fn add_provider() { // One round of publishing is complete. assert!(results.is_empty()); - for (_, s) in &swarms { - assert_eq!(s.queries.size(), 0); + for (_, swarm) in &swarms { + assert_eq!(swarm.queries.size(), 0); } if republished { @@ -667,19 +782,19 @@ fn add_provider() { /// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290. #[test] fn exceed_jobs_max_queries() { - let (_, mut swarms) = build_nodes(1); + let (_addr, mut swarm) = build_node(); let num = JOBS_MAX_QUERIES + 1; for _ in 0 .. num { - swarms[0].1.bootstrap(); + swarm.1.bootstrap(); } - assert_eq!(swarms[0].1.queries.size(), num); + assert_eq!(swarm.1.queries.size(), num); block_on( poll_fn(move |ctx| { for _ in 0 .. num { // There are no other nodes, so the queries finish instantly. - if let Poll::Ready(Some(e)) = swarms[0].1.poll_next_unpin(ctx) { + if let Poll::Ready(Some(e)) = swarm.1.poll_next_unpin(ctx) { if let KademliaEvent::BootstrapResult(r) = e { assert!(r.is_ok(), "Unexpected error") } else { diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 14ccd937..4d53711f 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -112,7 +112,7 @@ impl ClosestPeersIter { let state = PeerState::NotContacted; (distance, Peer { key, state }) }) - .take(config.num_results)); + .take(K_VALUE.into())); // The iterator initially makes progress by iterating towards the target. let state = State::Iterating { no_progress : 0 }; @@ -401,7 +401,9 @@ impl ClosestPeersIter { /// k closest nodes it has not already queried". fn at_capacity(&self) -> bool { match self.state { - State::Stalled => self.num_waiting >= self.config.num_results, + State::Stalled => self.num_waiting >= usize::max( + self.config.num_results, self.config.parallelism + ), State::Iterating { .. } => self.num_waiting >= self.config.parallelism, State::Finished => true } @@ -726,4 +728,49 @@ mod tests { QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _) } + + #[test] + fn without_success_try_up_to_k_peers() { + fn prop(mut iter: ClosestPeersIter) { + let now = Instant::now(); + + for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) { + match iter.next(now) { + PeersIterState::Waiting(Some(p)) => { + let peer = p.clone().into_owned(); + iter.on_failure(&peer); + }, + _ => panic!("Expected iterator to yield another peer to query."), + } + } + + assert_eq!(PeersIterState::Finished, iter.next(now)); + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_)) + } + + fn stalled_at_capacity() { + fn prop(mut iter: ClosestPeersIter) { + iter.state = State::Stalled; + + for i in 0..usize::max(iter.config.parallelism, iter.config.num_results) { + iter.num_waiting = i; + assert!( + !iter.at_capacity(), + "Iterator should not be at capacity if less than \ + `max(parallelism, num_results)` requests are waiting.", + ) + } + + iter.num_waiting = usize::max(iter.config.parallelism, iter.config.num_results); + assert!( + iter.at_capacity(), + "Iterator should be at capacity if `max(parallelism, num_results)` requests are \ + waiting.", + ) + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_)) + } } diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 5d21d3fa..a0684422 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-mdns" edition = "2018" -version = "0.17.0" +version = "0.18.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" @@ -16,8 +16,8 @@ dns-parser = "0.8" either = "1.5.3" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.17.0", path = "../../core" } -libp2p-swarm = { version = "0.17.0", path = "../../swarm" } +libp2p-core = { version = "0.18.0", path = "../../core" } +libp2p-swarm = { version = "0.18.0", path = "../../swarm" } log = "0.4" net2 = "0.2" rand = "0.7" diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 802d5885..50392188 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-noise" description = "Cryptographic handshake protocol using the noise framework." -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ edition = "2018" curve25519-dalek = "2.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4" prost = "0.6.1" rand = "0.7.2" @@ -28,7 +28,7 @@ snow = { version = "0.6.1", features = ["default-resolver"], default-features = [dev-dependencies] env_logger = "0.7.1" -libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } +libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" } quickcheck = "0.9.0" sodiumoxide = "^0.2.5" diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 942ae48a..b9323e81 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-ping" edition = "2018" description = "Ping protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../../core" } -libp2p-swarm = { version = "0.17.0", path = "../../swarm" } +libp2p-core = { version = "0.18.0", path = "../../core" } +libp2p-swarm = { version = "0.18.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" @@ -20,7 +20,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.0" -libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } -libp2p-secio = { version = "0.17.0", path = "../../protocols/secio" } -libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" } +libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" } +libp2p-secio = { version = "0.18.0", path = "../../protocols/secio" } +libp2p-yamux = { version = "0.18.0", path = "../../muxers/yamux" } quickcheck = "0.9.0" diff --git a/protocols/plaintext/Cargo.toml b/protocols/plaintext/Cargo.toml index b8df5dc8..07321552 100644 --- a/protocols/plaintext/Cargo.toml +++ b/protocols/plaintext/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-plaintext" edition = "2018" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] bytes = "0.5" futures = "0.3.1" futures_codec = "0.3.4" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.8" prost = "0.6.1" rw-stream-sink = "0.2.0" diff --git a/protocols/pnet/Cargo.toml b/protocols/pnet/Cargo.toml index ca55ac1f..6897cfb4 100644 --- a/protocols/pnet/Cargo.toml +++ b/protocols/pnet/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-pnet" edition = "2018" description = "Private swarm support for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index 66448676..a5be9059 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-secio" edition = "2018" description = "Secio encryption protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -16,7 +16,7 @@ ctr = "0.3" futures = "0.3.1" hmac = "0.7.0" lazy_static = "1.2.0" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.6" prost = "0.6.1" pin-project = "0.4.6" @@ -48,8 +48,8 @@ aes-all = ["aesni"] [dev-dependencies] async-std = "1.0" criterion = "0.3" -libp2p-mplex = { version = "0.17.0", path = "../../muxers/mplex" } -libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" } +libp2p-mplex = { version = "0.18.0", path = "../../muxers/mplex" } +libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" } [[bench]] name = "bench" diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index cb37d555..7feaa30a 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-swarm" edition = "2018" description = "The libp2p swarm" -version = "0.17.0" +version = "0.18.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../core" } +libp2p-core = { version = "0.18.0", path = "../core" } log = "0.4" rand = "0.7" smallvec = "1.0" @@ -19,6 +19,6 @@ wasm-timer = "0.2" void = "1" [dev-dependencies] -libp2p-mplex = { version = "0.17.0", path = "../muxers/mplex" } +libp2p-mplex = { version = "0.18.0", path = "../muxers/mplex" } quickcheck = "0.9.0" rand = "0.7.2" diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 11bb1a05..1c62d8d9 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -403,6 +403,12 @@ where TBehaviour: NetworkBehaviour, return Err(error) } } + } else { + log::debug!( + "New dialing attempt to disconnected peer {:?} failed: no address.", + peer_id + ); + me.behaviour.inject_dial_failure(&peer_id); } Ok(false) }, @@ -419,6 +425,12 @@ where TBehaviour: NetworkBehaviour, return Err(error) } } + } else { + log::debug!( + "New dialing attempt to disconnected peer {:?} failed: no address.", + peer_id + ); + me.behaviour.inject_dial_failure(&peer_id); } Ok(false) } @@ -427,6 +439,7 @@ where TBehaviour: NetworkBehaviour, Ok(false) }, Peer::Local => { + me.behaviour.inject_dial_failure(&peer_id); Err(ConnectionLimit { current: 0, limit: 0 }) } } @@ -701,34 +714,25 @@ where TBehaviour: NetworkBehaviour, if this.banned_peers.contains(&peer_id) { this.behaviour.inject_dial_failure(&peer_id); } else { - let result = match condition { + let condition_matched = match condition { DialPeerCondition::Disconnected - if this.network.is_disconnected(&peer_id) => - { - ExpandedSwarm::dial(this, &peer_id) - } + if this.network.is_disconnected(&peer_id) => true, DialPeerCondition::NotDialing - if !this.network.is_dialing(&peer_id) => - { - ExpandedSwarm::dial(this, &peer_id) - } - _ => { - log::trace!("Condition for new dialing attempt to {:?} not met: {:?}", - peer_id, condition); - if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() { - let addrs = this.behaviour.addresses_of_peer(peer.id()); - peer.connection().add_addresses(addrs); - } - Ok(false) - } + if !this.network.is_dialing(&peer_id) => true, + _ => false }; - match result { - Ok(false) => {}, - Ok(true) => return Poll::Ready(SwarmEvent::Dialing(peer_id)), - Err(err) => { - log::debug!("Initiating dialing attempt to {:?} failed: {:?}", - &peer_id, err); - this.behaviour.inject_dial_failure(&peer_id); + + if condition_matched { + if let Ok(true) = ExpandedSwarm::dial(this, &peer_id) { + return Poll::Ready(SwarmEvent::Dialing(peer_id)); + } + + } else { + log::trace!("Condition for new dialing attempt to {:?} not met: {:?}", + peer_id, condition); + if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() { + let addrs = this.behaviour.addresses_of_peer(peer.id()); + peer.connection().add_addresses(addrs); } } } diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index d476eac5..11d1958f 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-dns" edition = "2018" description = "DNS transport implementation for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,6 +10,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 69d9063e..b1d5d996 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-tcp" edition = "2018" description = "TCP/IP transport protocol for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -15,7 +15,7 @@ futures = "0.3.1" futures-timer = "3.0" get_if_addrs = "0.5.3" ipnet = "2.0.0" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.1" tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true } diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 5068577c..ee83f852 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-uds" edition = "2018" description = "Unix domain sockets transport for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dependencies] async-std = { version = "1.0", optional = true } -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" tokio = { version = "0.2", default-features = false, features = ["uds"], optional = true } diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index 299df214..4661e8b8 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-wasm-ext" -version = "0.17.0" +version = "0.18.0" authors = ["Pierre Krieger "] edition = "2018" description = "Allows passing in an external transport in a WASM environment" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.19" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/wasm-ext/src/websockets.js b/transports/wasm-ext/src/websockets.js index ec868bff..7b96409c 100644 --- a/transports/wasm-ext/src/websockets.js +++ b/transports/wasm-ext/src/websockets.js @@ -33,12 +33,12 @@ export const websocket_transport = () => { // TODO: support dns addresses as well const multiaddr_to_ws = (addr) => { let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/); - let proto = 'wss'; - if (parsed[4] == 'ws' || parsed[4] == 'x-parity-ws') { - proto = 'ws'; - } - let url = decodeURIComponent(parsed[5] || parsed[6] || ''); if (parsed != null) { + let proto = 'wss'; + if (parsed[4] == 'ws' || parsed[4] == 'x-parity-ws') { + proto = 'ws'; + } + let url = decodeURIComponent(parsed[5] || parsed[6] || ''); if (parsed[1] == 'ip6') { return proto + "://[" + parsed[2] + "]:" + parsed[3] + url; } else { diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index c1d4e03d..33d0419b 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-websocket" edition = "2018" description = "WebSocket transport for libp2p" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ async-tls = "0.7.0" bytes = "0.5" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.17.0", path = "../../core" } +libp2p-core = { version = "0.18.0", path = "../../core" } log = "0.4.8" quicksink = "0.1" rustls = "0.17.0" @@ -25,4 +25,4 @@ webpki = "0.21" webpki-roots = "0.18" [dev-dependencies] -libp2p-tcp = { version = "0.17.0", path = "../tcp" } +libp2p-tcp = { version = "0.18.0", path = "../tcp" }