mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-16 12:51:21 +00:00
protocols/gossipsub: Revert back to wasm_timer for interval (#2506)
Removed the custom interval implementation and removes support for wasm32-unknown-unknown. See https://github.com/libp2p/rust-libp2p/issues/2497 for details. Co-authored-by: Diva M <divma@protonmail.com> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
parent
7fc342e6c0
commit
60666f5455
@ -69,6 +69,10 @@
|
||||
- Update to `libp2p-websocket` `v0.34.0`.
|
||||
- Update to `libp2p-yamux` `v0.36.0`.
|
||||
|
||||
- Drop support for gossipsub in the wasm32-unknown-unknown target (see [PR 2506]).
|
||||
|
||||
[PR 2506]: https://github.com/libp2p/rust-libp2p/pull/2506
|
||||
|
||||
## Version 0.42.1 [2022-02-02]
|
||||
|
||||
- Update individual crates.
|
||||
|
@ -81,7 +81,6 @@ libp2p-autonat = { version = "0.2.0", path = "protocols/autonat", optional = tru
|
||||
libp2p-core = { version = "0.32.0", path = "core", default-features = false }
|
||||
libp2p-dcutr = { version = "0.1.0", path = "protocols/dcutr", optional = true }
|
||||
libp2p-floodsub = { version = "0.34.0", path = "protocols/floodsub", optional = true }
|
||||
libp2p-gossipsub = { version = "0.36.0", path = "./protocols/gossipsub", optional = true }
|
||||
libp2p-identify = { version = "0.34.0", path = "protocols/identify", optional = true }
|
||||
libp2p-kad = { version = "0.35.0", path = "protocols/kad", optional = true }
|
||||
libp2p-metrics = { version = "0.4.0", path = "misc/metrics", optional = true }
|
||||
@ -111,6 +110,9 @@ libp2p-mdns = { version = "0.35.0", path = "protocols/mdns", optional = true }
|
||||
libp2p-tcp = { version = "0.32.0", path = "transports/tcp", default-features = false, optional = true }
|
||||
libp2p-websocket = { version = "0.34.0", path = "transports/websocket", optional = true }
|
||||
|
||||
[target.'cfg(not(target_os = "unknown"))'.dependencies]
|
||||
libp2p-gossipsub = { version = "0.36.0", path = "protocols/gossipsub", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = { version = "1.6.2", features = ["attributes"] }
|
||||
async-trait = "0.1"
|
||||
|
@ -14,7 +14,10 @@
|
||||
|
||||
- Move from `open-metrics-client` to `prometheus-client` (see [PR 2442]).
|
||||
|
||||
- Drop support for gossipsub in wasm32-unknown-unknown target (see [PR 2506]).
|
||||
|
||||
[PR 2442]: https://github.com/libp2p/rust-libp2p/pull/2442
|
||||
[PR 2506]: https://github.com/libp2p/rust-libp2p/pull/2506
|
||||
|
||||
# 0.3.0 [2022-01-27]
|
||||
|
||||
|
@ -21,7 +21,6 @@ dcutr = ["libp2p-dcutr"]
|
||||
[dependencies]
|
||||
libp2p-core = { version = "0.32.0", path = "../../core", default-features = false }
|
||||
libp2p-dcutr = { version = "0.1.0", path = "../../protocols/dcutr", optional = true }
|
||||
libp2p-gossipsub = { version = "0.36.0", path = "../../protocols/gossipsub", optional = true }
|
||||
libp2p-identify = { version = "0.34.0", path = "../../protocols/identify", optional = true }
|
||||
libp2p-kad = { version = "0.35.0", path = "../../protocols/kad", optional = true }
|
||||
libp2p-ping = { version = "0.34.0", path = "../../protocols/ping", optional = true }
|
||||
@ -29,6 +28,9 @@ libp2p-relay = { version = "0.7.0", path = "../../protocols/relay", optional =
|
||||
libp2p-swarm = { version = "0.34.0", path = "../../swarm" }
|
||||
prometheus-client = "0.15.0"
|
||||
|
||||
[target.'cfg(not(target_os = "unknown"))'.dependencies]
|
||||
libp2p-gossipsub = { version = "0.36.0", path = "../../protocols/gossipsub", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.8.1"
|
||||
futures = "0.3.1"
|
||||
|
@ -28,6 +28,7 @@
|
||||
#[cfg(feature = "dcutr")]
|
||||
mod dcutr;
|
||||
#[cfg(feature = "gossipsub")]
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mod gossipsub;
|
||||
#[cfg(feature = "identify")]
|
||||
mod identify;
|
||||
@ -46,6 +47,7 @@ pub struct Metrics {
|
||||
#[cfg(feature = "dcutr")]
|
||||
dcutr: dcutr::Metrics,
|
||||
#[cfg(feature = "gossipsub")]
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
gossipsub: gossipsub::Metrics,
|
||||
#[cfg(feature = "identify")]
|
||||
identify: identify::Metrics,
|
||||
@ -73,6 +75,7 @@ impl Metrics {
|
||||
#[cfg(feature = "dcutr")]
|
||||
dcutr: dcutr::Metrics::new(sub_registry),
|
||||
#[cfg(feature = "gossipsub")]
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
gossipsub: gossipsub::Metrics::new(sub_registry),
|
||||
#[cfg(feature = "identify")]
|
||||
identify: identify::Metrics::new(sub_registry),
|
||||
|
@ -10,9 +10,12 @@
|
||||
|
||||
- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445).
|
||||
|
||||
- Revert to wasm-timer (see [PR 2506]).
|
||||
|
||||
[PR 2442]: https://github.com/libp2p/rust-libp2p/pull/2442
|
||||
[PR 2481]: https://github.com/libp2p/rust-libp2p/pull/2481
|
||||
[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445
|
||||
[PR 2506]: https://github.com/libp2p/rust-libp2p/pull/2506
|
||||
|
||||
# 0.35.0 [2022-01-27]
|
||||
|
||||
|
@ -27,10 +27,9 @@ smallvec = "1.6.1"
|
||||
prost = "0.9"
|
||||
hex_fmt = "0.3.0"
|
||||
regex = "1.4.0"
|
||||
futures-timer = "3.0.2"
|
||||
pin-project = "1.0.8"
|
||||
instant = "0.1.11"
|
||||
serde = { version = "1", optional = true, features = ["derive"] }
|
||||
wasm-timer = "0.2.5"
|
||||
instant = "0.1.11"
|
||||
# Metrics dependencies
|
||||
prometheus-client = "0.15.0"
|
||||
|
||||
|
@ -20,13 +20,13 @@
|
||||
|
||||
//! Data structure for efficiently storing known back-off's when pruning peers.
|
||||
use crate::topic::TopicHash;
|
||||
use instant::Instant;
|
||||
use libp2p_core::PeerId;
|
||||
use std::collections::{
|
||||
hash_map::{Entry, HashMap},
|
||||
HashSet,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use libp2p_core::PeerId;
|
||||
use std::collections::{
|
||||
hash_map::{Entry, HashMap},
|
||||
HashSet,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use wasm_timer::Instant;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
struct HeartbeatIndex(usize);
|
||||
|
@ -36,7 +36,6 @@ use prometheus_client::registry::Registry;
|
||||
use prost::Message;
|
||||
use rand::{seq::SliceRandom, thread_rng};
|
||||
|
||||
use instant::Instant;
|
||||
use libp2p_core::{
|
||||
connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
|
||||
multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
|
||||
@ -45,7 +44,9 @@ use libp2p_swarm::{
|
||||
dial_opts::{self, DialOpts},
|
||||
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
|
||||
};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
use crate::backoff::BackoffStorage;
|
||||
use crate::config::{GossipsubConfig, ValidationMode};
|
||||
use crate::error::{PublishError, SubscriptionError, ValidationError};
|
||||
use crate::gossip_promises::GossipPromises;
|
||||
@ -63,9 +64,9 @@ use crate::types::{
|
||||
GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
|
||||
};
|
||||
use crate::types::{GossipsubRpc, PeerConnections, PeerKind};
|
||||
use crate::{backoff::BackoffStorage, interval::Interval};
|
||||
use crate::{rpc_proto, TopicScoreParams};
|
||||
use std::{cmp::Ordering::Equal, fmt::Debug};
|
||||
use wasm_timer::Interval;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@ -439,8 +440,8 @@ where
|
||||
config.backoff_slack(),
|
||||
),
|
||||
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
|
||||
heartbeat: Interval::new_initial(
|
||||
config.heartbeat_initial_delay(),
|
||||
heartbeat: Interval::new_at(
|
||||
Instant::now() + config.heartbeat_initial_delay(),
|
||||
config.heartbeat_interval(),
|
||||
),
|
||||
heartbeat_ticks: 0,
|
||||
|
@ -21,10 +21,10 @@
|
||||
use crate::error::ValidationError;
|
||||
use crate::peer_score::RejectReason;
|
||||
use crate::MessageId;
|
||||
use instant::Instant;
|
||||
use libp2p_core::PeerId;
|
||||
use log::debug;
|
||||
use std::collections::HashMap;
|
||||
use wasm_timer::Instant;
|
||||
|
||||
/// Tracks recently sent `IWANT` messages and checks if peers respond to them.
|
||||
#[derive(Default)]
|
||||
|
@ -1,209 +0,0 @@
|
||||
// Copyright 2021 Oliver Wangler <oliver@wngr.de>
|
||||
// Copyright 2019 Pierre Krieger
|
||||
// Copyright (c) 2019 Tokio Contributors
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
// this software and associated documentation files (the "Software"), to deal in
|
||||
// the Software without restriction, including without limitation the rights to
|
||||
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
// subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in all
|
||||
// copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
|
||||
// OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
||||
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
//
|
||||
// Initial version copied from
|
||||
// https://github.com/tomaka/wasm-timer/blob/8964804eff980dd3eb115b711c57e481ba541708/src/timer/interval.rs
|
||||
// and adapted.
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures_timer::Delay;
|
||||
use instant::Instant;
|
||||
use pin_project::pin_project;
|
||||
|
||||
/// A stream representing notifications at fixed interval
|
||||
///
|
||||
/// Intervals are created through the `Interval::new` or
|
||||
/// `Interval::new_intial` methods indicating when a first notification
|
||||
/// should be triggered and when it will be repeated.
|
||||
///
|
||||
/// Note that intervals are not intended for high resolution timers, but rather
|
||||
/// they will likely fire some granularity after the exact instant that they're
|
||||
/// otherwise indicated to fire at.
|
||||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct Interval {
|
||||
#[pin]
|
||||
delay: Delay,
|
||||
interval: Duration,
|
||||
fires_at: Instant,
|
||||
}
|
||||
|
||||
impl Interval {
|
||||
/// Creates a new interval which will fire at `dur` time into the future,
|
||||
/// and will repeat every `dur` interval after
|
||||
///
|
||||
/// The returned object will be bound to the default timer for this thread.
|
||||
/// The default timer will be spun up in a helper thread on first use.
|
||||
pub fn new(dur: Duration) -> Interval {
|
||||
Interval::new_initial(dur, dur)
|
||||
}
|
||||
|
||||
/// Creates a new interval which will fire the first time after the specified `initial_delay`,
|
||||
/// and then will repeat every `dur` interval after.
|
||||
///
|
||||
/// The returned object will be bound to the default timer for this thread.
|
||||
/// The default timer will be spun up in a helper thread on first use.
|
||||
pub fn new_initial(initial_delay: Duration, dur: Duration) -> Interval {
|
||||
let fires_at = Instant::now() + initial_delay;
|
||||
Interval {
|
||||
delay: Delay::new(initial_delay),
|
||||
interval: dur,
|
||||
fires_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Interval {
|
||||
type Item = ();
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if self.as_mut().project().delay.poll(cx).is_pending() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
let next = next_interval(self.fires_at, Instant::now(), self.interval);
|
||||
self.delay.reset(next);
|
||||
self.fires_at += next;
|
||||
Poll::Ready(Some(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts Duration object to raw nanoseconds if possible
|
||||
///
|
||||
/// This is useful to divide intervals.
|
||||
///
|
||||
/// While technically for large duration it's impossible to represent any
|
||||
/// duration as nanoseconds, the largest duration we can represent is about
|
||||
/// 427_000 years. Large enough for any interval we would use or calculate in
|
||||
/// tokio.
|
||||
fn duration_to_nanos(dur: Duration) -> Option<u64> {
|
||||
let v = dur.as_secs().checked_mul(1_000_000_000)?;
|
||||
v.checked_add(dur.subsec_nanos() as u64)
|
||||
}
|
||||
|
||||
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Duration {
|
||||
let new = prev + interval;
|
||||
if new > now {
|
||||
interval
|
||||
} else {
|
||||
let spent_ns =
|
||||
duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
|
||||
let interval_ns =
|
||||
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
|
||||
let mult = spent_ns / interval_ns + 1;
|
||||
assert!(
|
||||
mult < (1 << 32),
|
||||
"can't skip more than 4 billion intervals of {:?} \
|
||||
(trying to skip {})",
|
||||
interval,
|
||||
mult
|
||||
);
|
||||
interval * mult as u32
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::next_interval;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
struct Timeline(Instant);
|
||||
|
||||
impl Timeline {
|
||||
fn new() -> Timeline {
|
||||
Timeline(Instant::now())
|
||||
}
|
||||
fn at(&self, millis: u64) -> Instant {
|
||||
self.0 + Duration::from_millis(millis)
|
||||
}
|
||||
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
|
||||
self.0 + Duration::new(sec, nanos)
|
||||
}
|
||||
}
|
||||
|
||||
fn dur(millis: u64) -> Duration {
|
||||
Duration::from_millis(millis)
|
||||
}
|
||||
|
||||
// The math around Instant/Duration isn't 100% precise due to rounding
|
||||
// errors
|
||||
fn almost_eq(a: Instant, b: Instant) -> bool {
|
||||
let diff = match a.cmp(&b) {
|
||||
std::cmp::Ordering::Less => b - a,
|
||||
std::cmp::Ordering::Equal => return true,
|
||||
std::cmp::Ordering::Greater => a - b,
|
||||
};
|
||||
|
||||
diff < Duration::from_millis(1)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn norm_next() {
|
||||
let tm = Timeline::new();
|
||||
assert!(almost_eq(
|
||||
tm.at(1) + next_interval(tm.at(1), tm.at(2), dur(10)),
|
||||
tm.at(11)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
tm.at(7777) + next_interval(tm.at(7777), tm.at(7788), dur(100)),
|
||||
tm.at(7877)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(2100)),
|
||||
tm.at(2101)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fast_forward() {
|
||||
let tm = Timeline::new();
|
||||
|
||||
assert!(almost_eq(
|
||||
tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(10)),
|
||||
tm.at(1001)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
tm.at(7777) + next_interval(tm.at(7777), tm.at(8888), dur(100)),
|
||||
tm.at(8977)
|
||||
));
|
||||
assert!(almost_eq(
|
||||
tm.at(1) + next_interval(tm.at(1), tm.at(10000), dur(2100)),
|
||||
tm.at(10501)
|
||||
));
|
||||
}
|
||||
|
||||
/// TODO: this test actually should be successful, but since we can't
|
||||
/// multiply Duration on anything larger than u32 easily we decided
|
||||
/// to allow it to fail for now
|
||||
#[test]
|
||||
#[should_panic(expected = "can't skip more than 4 billion intervals")]
|
||||
fn large_skip() {
|
||||
let tm = Timeline::new();
|
||||
assert_eq!(
|
||||
tm.0 + next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
|
||||
tm.at_ns(25, 1)
|
||||
);
|
||||
}
|
||||
}
|
@ -142,7 +142,6 @@ mod behaviour;
|
||||
mod config;
|
||||
mod gossip_promises;
|
||||
mod handler;
|
||||
mod interval;
|
||||
mod mcache;
|
||||
pub mod metrics;
|
||||
mod peer_score;
|
||||
|
@ -24,12 +24,12 @@
|
||||
use crate::metrics::{Metrics, Penalty};
|
||||
use crate::time_cache::TimeCache;
|
||||
use crate::{MessageId, TopicHash};
|
||||
use instant::Instant;
|
||||
use libp2p_core::PeerId;
|
||||
use log::{debug, trace, warn};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::net::IpAddr;
|
||||
use std::time::Duration;
|
||||
use wasm_timer::Instant;
|
||||
|
||||
mod params;
|
||||
use crate::error::ValidationError;
|
||||
|
@ -21,13 +21,13 @@
|
||||
//! This implements a time-based LRU cache for checking gossipsub message duplicates.
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use instant::Instant;
|
||||
use std::collections::hash_map::{
|
||||
self,
|
||||
Entry::{Occupied, Vacant},
|
||||
};
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
use wasm_timer::Instant;
|
||||
|
||||
struct ExpiringElement<Element> {
|
||||
/// The element that expires
|
||||
|
@ -68,6 +68,7 @@ pub use libp2p_dns as dns;
|
||||
pub use libp2p_floodsub as floodsub;
|
||||
#[cfg(feature = "gossipsub")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "gossipsub")))]
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
#[doc(inline)]
|
||||
pub use libp2p_gossipsub as gossipsub;
|
||||
#[cfg(feature = "identify")]
|
||||
|
Loading…
x
Reference in New Issue
Block a user