mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Make clippy "happy". (#1950)
* Make clippy "happy". Address all clippy complaints that are not purely stylistic (or even have corner cases with false positives). Ignore all "style" and "pedantic" lints. * Fix tests. * Undo unnecessary API change.
This commit is contained in:
parent
12557a3c86
commit
6499e924a3
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@ -138,7 +138,7 @@ jobs:
|
|||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: clippy
|
command: clippy
|
||||||
args: -- -A clippy::mutable_key_type -A clippy::type_complexity
|
args: -- -A clippy::type_complexity -A clippy::pedantic -A clippy::style
|
||||||
|
|
||||||
run-benchmarks:
|
run-benchmarks:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
@ -182,13 +182,13 @@ where
|
|||||||
},
|
},
|
||||||
PoolEvent::ConnectionEvent { ref connection, ref event } => {
|
PoolEvent::ConnectionEvent { ref connection, ref event } => {
|
||||||
f.debug_struct("PoolEvent::ConnectionEvent")
|
f.debug_struct("PoolEvent::ConnectionEvent")
|
||||||
.field("peer", connection.peer_id())
|
.field("peer", &connection.peer_id())
|
||||||
.field("event", event)
|
.field("event", event)
|
||||||
.finish()
|
.finish()
|
||||||
},
|
},
|
||||||
PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
|
PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
|
||||||
f.debug_struct("PoolEvent::AddressChange")
|
f.debug_struct("PoolEvent::AddressChange")
|
||||||
.field("peer", connection.peer_id())
|
.field("peer", &connection.peer_id())
|
||||||
.field("new_endpoint", new_endpoint)
|
.field("new_endpoint", new_endpoint)
|
||||||
.field("old_endpoint", old_endpoint)
|
.field("old_endpoint", old_endpoint)
|
||||||
.finish()
|
.finish()
|
||||||
@ -325,8 +325,8 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
|||||||
// "established" connection.
|
// "established" connection.
|
||||||
let future = future.and_then({
|
let future = future.and_then({
|
||||||
let endpoint = endpoint.clone();
|
let endpoint = endpoint.clone();
|
||||||
let expected_peer = peer.clone();
|
let expected_peer = peer;
|
||||||
let local_id = self.local_id.clone();
|
let local_id = self.local_id;
|
||||||
move |(peer_id, muxer)| {
|
move |(peer_id, muxer)| {
|
||||||
if let Some(peer) = expected_peer {
|
if let Some(peer) = expected_peer {
|
||||||
if peer != peer_id {
|
if peer != peer_id {
|
||||||
@ -376,7 +376,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
|||||||
self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
|
self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
|
||||||
let id = self.manager.add(c, i.clone());
|
let id = self.manager.add(c, i.clone());
|
||||||
self.counters.inc_established(&i.endpoint);
|
self.counters.inc_established(&i.endpoint);
|
||||||
self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint);
|
self.established.entry(i.peer_id).or_default().insert(id, i.endpoint);
|
||||||
Ok(id)
|
Ok(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -667,7 +667,7 @@ impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add the connection to the pool.
|
// Add the connection to the pool.
|
||||||
let peer = entry.connected().peer_id.clone();
|
let peer = entry.connected().peer_id;
|
||||||
let conns = self.established.entry(peer).or_default();
|
let conns = self.established.entry(peer).or_default();
|
||||||
let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
|
let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
|
||||||
.expect("n + 1 is always non-zero; qed");
|
.expect("n + 1 is always non-zero; qed");
|
||||||
@ -786,8 +786,8 @@ impl<TInEvent> EstablishedConnection<'_, TInEvent> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the identity of the connected peer.
|
/// Returns the identity of the connected peer.
|
||||||
pub fn peer_id(&self) -> &PeerId {
|
pub fn peer_id(&self) -> PeerId {
|
||||||
&self.entry.connected().peer_id
|
self.entry.connected().peer_id
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the local connection ID.
|
/// Returns the local connection ID.
|
||||||
@ -842,6 +842,7 @@ where
|
|||||||
I: Iterator<Item = ConnectionId>
|
I: Iterator<Item = ConnectionId>
|
||||||
{
|
{
|
||||||
/// Obtains the next connection, if any.
|
/// Obtains the next connection, if any.
|
||||||
|
#[allow(clippy::should_implement_trait)]
|
||||||
pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
|
pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
|
||||||
{
|
{
|
||||||
while let Some(id) = self.ids.next() {
|
while let Some(id) = self.ids.next() {
|
||||||
|
@ -144,11 +144,10 @@ where
|
|||||||
local_peer_id: PeerId,
|
local_peer_id: PeerId,
|
||||||
config: NetworkConfig,
|
config: NetworkConfig,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let pool_local_id = local_peer_id.clone();
|
|
||||||
Network {
|
Network {
|
||||||
local_peer_id,
|
local_peer_id,
|
||||||
listeners: ListenersStream::new(transport),
|
listeners: ListenersStream::new(transport),
|
||||||
pool: Pool::new(pool_local_id, config.manager_config, config.limits),
|
pool: Pool::new(local_peer_id, config.manager_config, config.limits),
|
||||||
dialing: Default::default(),
|
dialing: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -380,7 +379,7 @@ where
|
|||||||
let event = match self.pool.poll(cx) {
|
let event = match self.pool.poll(cx) {
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
|
Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
|
||||||
if let hash_map::Entry::Occupied(mut e) = self.dialing.entry(connection.peer_id().clone()) {
|
if let hash_map::Entry::Occupied(mut e) = self.dialing.entry(connection.peer_id()) {
|
||||||
e.get_mut().retain(|s| s.current.0 != connection.id());
|
e.get_mut().retain(|s| s.current.0 != connection.id());
|
||||||
if e.get().is_empty() {
|
if e.get().is_empty() {
|
||||||
e.remove();
|
e.remove();
|
||||||
@ -526,7 +525,7 @@ where
|
|||||||
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
|
if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
|
||||||
let attempt = attempts.remove(pos);
|
let attempt = attempts.remove(pos);
|
||||||
let last = attempts.is_empty();
|
let last = attempts.is_empty();
|
||||||
Some((peer.clone(), attempt, last))
|
Some((*peer, attempt, last))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@ -545,7 +544,7 @@ where
|
|||||||
if let Some(handler) = handler {
|
if let Some(handler) = handler {
|
||||||
let next_attempt = attempt.remaining.remove(0);
|
let next_attempt = attempt.remaining.remove(0);
|
||||||
let opts = DialingOpts {
|
let opts = DialingOpts {
|
||||||
peer: peer_id.clone(),
|
peer: peer_id,
|
||||||
handler,
|
handler,
|
||||||
address: next_attempt,
|
address: next_attempt,
|
||||||
remaining: attempt.remaining
|
remaining: attempt.remaining
|
||||||
|
@ -223,7 +223,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let id = network.dial_peer(DialingOpts {
|
let id = network.dial_peer(DialingOpts {
|
||||||
peer: peer_id.clone(),
|
peer: peer_id,
|
||||||
handler,
|
handler,
|
||||||
address,
|
address,
|
||||||
remaining: remaining.into_iter().collect(),
|
remaining: remaining.into_iter().collect(),
|
||||||
@ -435,7 +435,7 @@ where
|
|||||||
pub fn attempt(&mut self, id: ConnectionId)
|
pub fn attempt(&mut self, id: ConnectionId)
|
||||||
-> Option<DialingAttempt<'_, TInEvent>>
|
-> Option<DialingAttempt<'_, TInEvent>>
|
||||||
{
|
{
|
||||||
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id.clone()) {
|
if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id) {
|
||||||
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
|
if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
|
||||||
if let Some(inner) = self.network.pool.get_outgoing(id) {
|
if let Some(inner) = self.network.pool.get_outgoing(id) {
|
||||||
return Some(DialingAttempt { pos, inner, attempts })
|
return Some(DialingAttempt { pos, inner, attempts })
|
||||||
@ -662,7 +662,8 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Obtains the next dialing connection, if any.
|
/// Obtains the next dialing connection, if any.
|
||||||
pub fn next<'b>(&'b mut self) -> Option<DialingAttempt<'b, TInEvent>> {
|
#[allow(clippy::should_implement_trait)]
|
||||||
|
pub fn next(&mut self) -> Option<DialingAttempt<'_, TInEvent>> {
|
||||||
// If the number of elements reduced, the current `DialingAttempt` has been
|
// If the number of elements reduced, the current `DialingAttempt` has been
|
||||||
// aborted and iteration needs to continue from the previous position to
|
// aborted and iteration needs to continue from the previous position to
|
||||||
// account for the removed element.
|
// account for the removed element.
|
||||||
@ -676,7 +677,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
|||||||
return None
|
return None
|
||||||
}
|
}
|
||||||
|
|
||||||
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) {
|
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(*self.peer_id) {
|
||||||
let id = attempts.get()[self.pos].current.0;
|
let id = attempts.get()[self.pos].current.0;
|
||||||
if let Some(inner) = self.pool.get_outgoing(id) {
|
if let Some(inner) = self.pool.get_outgoing(id) {
|
||||||
let conn = DialingAttempt { pos: self.pos, inner, attempts };
|
let conn = DialingAttempt { pos: self.pos, inner, attempts };
|
||||||
@ -697,7 +698,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
|
|||||||
return None
|
return None
|
||||||
}
|
}
|
||||||
|
|
||||||
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(self.peer_id.clone()) {
|
if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(*self.peer_id) {
|
||||||
let id = attempts.get()[self.pos].current.0;
|
let id = attempts.get()[self.pos].current.0;
|
||||||
if let Some(inner) = self.pool.get_outgoing(id) {
|
if let Some(inner) = self.pool.get_outgoing(id) {
|
||||||
return Some(DialingAttempt { pos: self.pos, inner, attempts })
|
return Some(DialingAttempt { pos: self.pos, inner, attempts })
|
||||||
|
@ -69,7 +69,7 @@ fn deny_incoming_connec() {
|
|||||||
multiaddr,
|
multiaddr,
|
||||||
error: PendingConnectionError::Transport(_)
|
error: PendingConnectionError::Transport(_)
|
||||||
}) => {
|
}) => {
|
||||||
assert_eq!(peer_id, *swarm1.local_peer_id());
|
assert_eq!(&peer_id, swarm1.local_peer_id());
|
||||||
assert_eq!(multiaddr, address);
|
assert_eq!(multiaddr, address);
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
},
|
},
|
||||||
|
@ -321,7 +321,7 @@ where
|
|||||||
|
|
||||||
// Remove the substream, scheduling pending frames as necessary.
|
// Remove the substream, scheduling pending frames as necessary.
|
||||||
match self.substreams.remove(&id) {
|
match self.substreams.remove(&id) {
|
||||||
None => return,
|
None => {},
|
||||||
Some(state) => {
|
Some(state) => {
|
||||||
// If we fell below the substream limit, notify tasks that had
|
// If we fell below the substream limit, notify tasks that had
|
||||||
// interest in opening an outbound substream earlier.
|
// interest in opening an outbound substream earlier.
|
||||||
@ -442,7 +442,7 @@ where
|
|||||||
// Read the next frame.
|
// Read the next frame.
|
||||||
match ready!(self.poll_read_frame(cx, Some(id)))? {
|
match ready!(self.poll_read_frame(cx, Some(id)))? {
|
||||||
Frame::Data { data, stream_id } if stream_id.into_local() == id => {
|
Frame::Data { data, stream_id } if stream_id.into_local() == id => {
|
||||||
return Poll::Ready(Ok(Some(data.clone())))
|
return Poll::Ready(Ok(Some(data)))
|
||||||
},
|
},
|
||||||
Frame::Data { stream_id, data } => {
|
Frame::Data { stream_id, data } => {
|
||||||
// The data frame is for a different stream than the one
|
// The data frame is for a different stream than the one
|
||||||
@ -595,18 +595,16 @@ where
|
|||||||
// this task again to have a chance at progress.
|
// this task again to have a chance at progress.
|
||||||
trace!("{}: No task to read from blocked stream. Waking current task.", self.id);
|
trace!("{}: No task to read from blocked stream. Waking current task.", self.id);
|
||||||
cx.waker().clone().wake();
|
cx.waker().clone().wake();
|
||||||
|
} else if let Some(id) = stream_id {
|
||||||
|
// We woke some other task, but are still interested in
|
||||||
|
// reading `Data` frames from the current stream when unblocked.
|
||||||
|
debug_assert!(blocked_id != &id, "Unexpected attempt at reading a new \
|
||||||
|
frame from a substream with a full buffer.");
|
||||||
|
let _ = NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id);
|
||||||
} else {
|
} else {
|
||||||
if let Some(id) = stream_id {
|
// We woke some other task but are still interested in
|
||||||
// We woke some other task, but are still interested in
|
// reading new `Open` frames when unblocked.
|
||||||
// reading `Data` frames from the current stream when unblocked.
|
let _ = NotifierRead::register_next_stream(&self.notifier_read, cx.waker());
|
||||||
debug_assert!(blocked_id != &id, "Unexpected attempt at reading a new \
|
|
||||||
frame from a substream with a full buffer.");
|
|
||||||
let _ = NotifierRead::register_read_stream(&self.notifier_read, cx.waker(), id);
|
|
||||||
} else {
|
|
||||||
// We woke some other task but are still interested in
|
|
||||||
// reading new `Open` frames when unblocked.
|
|
||||||
let _ = NotifierRead::register_next_stream(&self.notifier_read, cx.waker());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Poll::Pending
|
return Poll::Pending
|
||||||
@ -932,7 +930,7 @@ impl NotifierRead {
|
|||||||
|
|
||||||
impl ArcWake for NotifierRead {
|
impl ArcWake for NotifierRead {
|
||||||
fn wake_by_ref(this: &Arc<Self>) {
|
fn wake_by_ref(this: &Arc<Self>) {
|
||||||
let wakers = mem::replace(&mut *this.read_stream.lock(), Default::default());
|
let wakers = mem::take(&mut *this.read_stream.lock());
|
||||||
for (_, waker) in wakers {
|
for (_, waker) in wakers {
|
||||||
waker.wake();
|
waker.wake();
|
||||||
}
|
}
|
||||||
@ -963,7 +961,7 @@ impl NotifierWrite {
|
|||||||
|
|
||||||
impl ArcWake for NotifierWrite {
|
impl ArcWake for NotifierWrite {
|
||||||
fn wake_by_ref(this: &Arc<Self>) {
|
fn wake_by_ref(this: &Arc<Self>) {
|
||||||
let wakers = mem::replace(&mut *this.pending.lock(), Default::default());
|
let wakers = mem::take(&mut *this.pending.lock());
|
||||||
for waker in wakers {
|
for waker in wakers {
|
||||||
waker.wake();
|
waker.wake();
|
||||||
}
|
}
|
||||||
@ -985,7 +983,7 @@ impl NotifierOpen {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn wake_all(&mut self) {
|
fn wake_all(&mut self) {
|
||||||
let wakers = mem::replace(&mut self.pending, Default::default());
|
let wakers = mem::take(&mut self.pending);
|
||||||
for waker in wakers {
|
for waker in wakers {
|
||||||
waker.wake();
|
waker.wake();
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,7 @@ where
|
|||||||
-> Poll<Result<Self::Substream, io::Error>>
|
-> Poll<Result<Self::Substream, io::Error>>
|
||||||
{
|
{
|
||||||
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
|
let stream_id = ready!(self.io.lock().poll_open_stream(cx))?;
|
||||||
return Poll::Ready(Ok(Substream::new(stream_id)))
|
Poll::Ready(Ok(Substream::new(stream_id)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
|
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
|
||||||
|
@ -34,7 +34,6 @@ use libp2p_swarm::{
|
|||||||
DialPeerCondition,
|
DialPeerCondition,
|
||||||
};
|
};
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use rand;
|
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{collections::VecDeque, iter};
|
use std::{collections::VecDeque, iter};
|
||||||
use std::collections::hash_map::{DefaultHasher, HashMap};
|
use std::collections::hash_map::{DefaultHasher, HashMap};
|
||||||
@ -89,7 +88,7 @@ impl Floodsub {
|
|||||||
if self.connected_peers.contains_key(&peer_id) {
|
if self.connected_peers.contains_key(&peer_id) {
|
||||||
for topic in self.subscribed_topics.iter().cloned() {
|
for topic in self.subscribed_topics.iter().cloned() {
|
||||||
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: peer_id.clone(),
|
peer_id,
|
||||||
handler: NotifyHandler::Any,
|
handler: NotifyHandler::Any,
|
||||||
event: FloodsubRpc {
|
event: FloodsubRpc {
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -102,7 +101,7 @@ impl Floodsub {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.target_peers.insert(peer_id.clone()) {
|
if self.target_peers.insert(peer_id) {
|
||||||
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id, condition: DialPeerCondition::Disconnected
|
peer_id, condition: DialPeerCondition::Disconnected
|
||||||
});
|
});
|
||||||
@ -125,7 +124,7 @@ impl Floodsub {
|
|||||||
|
|
||||||
for peer in self.connected_peers.keys() {
|
for peer in self.connected_peers.keys() {
|
||||||
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: peer.clone(),
|
peer_id: *peer,
|
||||||
handler: NotifyHandler::Any,
|
handler: NotifyHandler::Any,
|
||||||
event: FloodsubRpc {
|
event: FloodsubRpc {
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -156,7 +155,7 @@ impl Floodsub {
|
|||||||
|
|
||||||
for peer in self.connected_peers.keys() {
|
for peer in self.connected_peers.keys() {
|
||||||
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: peer.clone(),
|
peer_id: *peer,
|
||||||
handler: NotifyHandler::Any,
|
handler: NotifyHandler::Any,
|
||||||
event: FloodsubRpc {
|
event: FloodsubRpc {
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -196,7 +195,7 @@ impl Floodsub {
|
|||||||
|
|
||||||
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
|
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
|
||||||
let message = FloodsubMessage {
|
let message = FloodsubMessage {
|
||||||
source: self.config.local_peer_id.clone(),
|
source: self.config.local_peer_id,
|
||||||
data: data.into(),
|
data: data.into(),
|
||||||
// If the sequence numbers are predictable, then an attacker could flood the network
|
// If the sequence numbers are predictable, then an attacker could flood the network
|
||||||
// with packets with the predetermined sequence numbers and absorb our legitimate
|
// with packets with the predetermined sequence numbers and absorb our legitimate
|
||||||
@ -231,7 +230,7 @@ impl Floodsub {
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: peer_id.clone(),
|
peer_id: *peer_id,
|
||||||
handler: NotifyHandler::Any,
|
handler: NotifyHandler::Any,
|
||||||
event: FloodsubRpc {
|
event: FloodsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
@ -259,7 +258,7 @@ impl NetworkBehaviour for Floodsub {
|
|||||||
if self.target_peers.contains(id) {
|
if self.target_peers.contains(id) {
|
||||||
for topic in self.subscribed_topics.iter().cloned() {
|
for topic in self.subscribed_topics.iter().cloned() {
|
||||||
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: id.clone(),
|
peer_id: *id,
|
||||||
handler: NotifyHandler::Any,
|
handler: NotifyHandler::Any,
|
||||||
event: FloodsubRpc {
|
event: FloodsubRpc {
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -272,7 +271,7 @@ impl NetworkBehaviour for Floodsub {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.connected_peers.insert(id.clone(), SmallVec::new());
|
self.connected_peers.insert(*id, SmallVec::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_disconnected(&mut self, id: &PeerId) {
|
fn inject_disconnected(&mut self, id: &PeerId) {
|
||||||
@ -283,7 +282,7 @@ impl NetworkBehaviour for Floodsub {
|
|||||||
// try to reconnect.
|
// try to reconnect.
|
||||||
if self.target_peers.contains(id) {
|
if self.target_peers.contains(id) {
|
||||||
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id: id.clone(),
|
peer_id: *id,
|
||||||
condition: DialPeerCondition::Disconnected
|
condition: DialPeerCondition::Disconnected
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -312,7 +311,7 @@ impl NetworkBehaviour for Floodsub {
|
|||||||
remote_peer_topics.push(subscription.topic.clone());
|
remote_peer_topics.push(subscription.topic.clone());
|
||||||
}
|
}
|
||||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
|
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
|
||||||
peer_id: propagation_source.clone(),
|
peer_id: propagation_source,
|
||||||
topic: subscription.topic,
|
topic: subscription.topic,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
@ -321,7 +320,7 @@ impl NetworkBehaviour for Floodsub {
|
|||||||
remote_peer_topics.remove(pos);
|
remote_peer_topics.remove(pos);
|
||||||
}
|
}
|
||||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
|
self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
|
||||||
peer_id: propagation_source.clone(),
|
peer_id: propagation_source,
|
||||||
topic: subscription.topic,
|
topic: subscription.topic,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
@ -364,7 +363,7 @@ impl NetworkBehaviour for Floodsub {
|
|||||||
if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
|
if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
|
||||||
rpcs_to_dispatch[pos].1.messages.push(message.clone());
|
rpcs_to_dispatch[pos].1.messages.push(message.clone());
|
||||||
} else {
|
} else {
|
||||||
rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
|
rpcs_to_dispatch.push((*peer_id, FloodsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: vec![message.clone()],
|
messages: vec![message.clone()],
|
||||||
}));
|
}));
|
||||||
|
@ -78,7 +78,7 @@ impl BackoffStorage {
|
|||||||
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
|
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
|
||||||
heartbeat_interval,
|
heartbeat_interval,
|
||||||
backoff_slack| {
|
backoff_slack| {
|
||||||
let pair = (topic.clone(), peer.clone());
|
let pair = (topic.clone(), *peer);
|
||||||
let index = (heartbeat_index.0
|
let index = (heartbeat_index.0
|
||||||
+ Self::heartbeats(&time, heartbeat_interval)
|
+ Self::heartbeats(&time, heartbeat_interval)
|
||||||
+ backoff_slack as usize)
|
+ backoff_slack as usize)
|
||||||
@ -90,12 +90,12 @@ impl BackoffStorage {
|
|||||||
.backoffs
|
.backoffs
|
||||||
.entry(topic.clone())
|
.entry(topic.clone())
|
||||||
.or_insert_with(HashMap::new)
|
.or_insert_with(HashMap::new)
|
||||||
.entry(peer.clone())
|
.entry(*peer)
|
||||||
{
|
{
|
||||||
Entry::Occupied(mut o) => {
|
Entry::Occupied(mut o) => {
|
||||||
let (backoff, index) = o.get();
|
let (backoff, index) = o.get();
|
||||||
if backoff < &instant {
|
if backoff < &instant {
|
||||||
let pair = (topic.clone(), peer.clone());
|
let pair = (topic.clone(), *peer);
|
||||||
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
|
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
|
||||||
s.remove(&pair);
|
s.remove(&pair);
|
||||||
}
|
}
|
||||||
|
@ -574,7 +574,7 @@ where
|
|||||||
|
|
||||||
// calculate the message id from the un-transformed data
|
// calculate the message id from the un-transformed data
|
||||||
let msg_id = self.config.message_id(&GossipsubMessage {
|
let msg_id = self.config.message_id(&GossipsubMessage {
|
||||||
source: raw_message.source.clone(),
|
source: raw_message.source,
|
||||||
data, // the uncompressed form
|
data, // the uncompressed form
|
||||||
sequence_number: raw_message.sequence_number,
|
sequence_number: raw_message.sequence_number,
|
||||||
topic: raw_message.topic.clone(),
|
topic: raw_message.topic.clone(),
|
||||||
@ -629,7 +629,7 @@ where
|
|||||||
// Explicit peers
|
// Explicit peers
|
||||||
for peer in &self.explicit_peers {
|
for peer in &self.explicit_peers {
|
||||||
if set.contains(peer) {
|
if set.contains(peer) {
|
||||||
recipient_peers.insert(peer.clone());
|
recipient_peers.insert(*peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -640,7 +640,7 @@ where
|
|||||||
.score_below_threshold(peer, |ts| ts.publish_threshold)
|
.score_below_threshold(peer, |ts| ts.publish_threshold)
|
||||||
.0
|
.0
|
||||||
{
|
{
|
||||||
recipient_peers.insert(peer.clone());
|
recipient_peers.insert(*peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -650,7 +650,7 @@ where
|
|||||||
// If we have fanout peers add them to the map.
|
// If we have fanout peers add them to the map.
|
||||||
if self.fanout.contains_key(&topic_hash) {
|
if self.fanout.contains_key(&topic_hash) {
|
||||||
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
|
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
|
||||||
recipient_peers.insert(peer.clone());
|
recipient_peers.insert(*peer);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We have no fanout peers, select mesh_n of them and add them to the fanout
|
// We have no fanout peers, select mesh_n of them and add them to the fanout
|
||||||
@ -673,7 +673,7 @@ where
|
|||||||
self.fanout.insert(topic_hash.clone(), new_peers.clone());
|
self.fanout.insert(topic_hash.clone(), new_peers.clone());
|
||||||
for peer in new_peers {
|
for peer in new_peers {
|
||||||
debug!("Peer added to fanout: {:?}", peer);
|
debug!("Peer added to fanout: {:?}", peer);
|
||||||
recipient_peers.insert(peer.clone());
|
recipient_peers.insert(peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// We are publishing to fanout peers - update the time we published
|
// We are publishing to fanout peers - update the time we published
|
||||||
@ -703,7 +703,7 @@ where
|
|||||||
// Send to peers we know are subscribed to the topic.
|
// Send to peers we know are subscribed to the topic.
|
||||||
for peer_id in recipient_peers.iter() {
|
for peer_id in recipient_peers.iter() {
|
||||||
debug!("Sending message to peer: {:?}", peer_id);
|
debug!("Sending message to peer: {:?}", peer_id);
|
||||||
self.send_message(peer_id.clone(), event.clone())?;
|
self.send_message(*peer_id, event.clone())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Published message: {:?}", &msg_id);
|
info!("Published message: {:?}", &msg_id);
|
||||||
@ -775,7 +775,7 @@ where
|
|||||||
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
|
pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
|
||||||
debug!("Adding explicit peer {}", peer_id);
|
debug!("Adding explicit peer {}", peer_id);
|
||||||
|
|
||||||
self.explicit_peers.insert(peer_id.clone());
|
self.explicit_peers.insert(*peer_id);
|
||||||
|
|
||||||
self.check_explicit_peer_connection(peer_id);
|
self.check_explicit_peer_connection(peer_id);
|
||||||
}
|
}
|
||||||
@ -790,7 +790,7 @@ where
|
|||||||
/// Blacklists a peer. All messages from this peer will be rejected and any message that was
|
/// Blacklists a peer. All messages from this peer will be rejected and any message that was
|
||||||
/// created by this peer will be rejected.
|
/// created by this peer will be rejected.
|
||||||
pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
|
pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
|
||||||
if self.blacklisted_peers.insert(peer_id.clone()) {
|
if self.blacklisted_peers.insert(*peer_id) {
|
||||||
debug!("Peer has been blacklisted: {}", peer_id);
|
debug!("Peer has been blacklisted: {}", peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -942,7 +942,7 @@ where
|
|||||||
}
|
}
|
||||||
Self::control_pool_add(
|
Self::control_pool_add(
|
||||||
&mut self.control_pool,
|
&mut self.control_pool,
|
||||||
peer_id.clone(),
|
peer_id,
|
||||||
GossipsubControlAction::Graft {
|
GossipsubControlAction::Graft {
|
||||||
topic_hash: topic_hash.clone(),
|
topic_hash: topic_hash.clone(),
|
||||||
},
|
},
|
||||||
@ -1017,7 +1017,7 @@ where
|
|||||||
// Send a PRUNE control message
|
// Send a PRUNE control message
|
||||||
info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
|
info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
|
||||||
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
|
let control = self.make_prune(topic_hash, &peer, self.config.do_px());
|
||||||
Self::control_pool_add(&mut self.control_pool, peer.clone(), control);
|
Self::control_pool_add(&mut self.control_pool, peer, control);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
debug!("Completed LEAVE for topic: {:?}", topic_hash);
|
debug!("Completed LEAVE for topic: {:?}", topic_hash);
|
||||||
@ -1029,7 +1029,7 @@ where
|
|||||||
// Connect to peer
|
// Connect to peer
|
||||||
debug!("Connecting to explicit peer {:?}", peer_id);
|
debug!("Connecting to explicit peer {:?}", peer_id);
|
||||||
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id: peer_id.clone(),
|
peer_id: *peer_id,
|
||||||
condition: DialPeerCondition::Disconnected,
|
condition: DialPeerCondition::Disconnected,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -1076,7 +1076,7 @@ where
|
|||||||
// IHAVE flood protection
|
// IHAVE flood protection
|
||||||
let peer_have = self
|
let peer_have = self
|
||||||
.count_received_ihave
|
.count_received_ihave
|
||||||
.entry(peer_id.clone())
|
.entry(*peer_id)
|
||||||
.or_insert(0);
|
.or_insert(0);
|
||||||
*peer_have += 1;
|
*peer_have += 1;
|
||||||
if *peer_have > self.config.max_ihave_messages() {
|
if *peer_have > self.config.max_ihave_messages() {
|
||||||
@ -1122,7 +1122,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !iwant_ids.is_empty() {
|
if !iwant_ids.is_empty() {
|
||||||
let iasked = self.count_sent_iwant.entry(peer_id.clone()).or_insert(0);
|
let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
|
||||||
let mut iask = iwant_ids.len();
|
let mut iask = iwant_ids.len();
|
||||||
if *iasked + iask > self.config.max_ihave_length() {
|
if *iasked + iask > self.config.max_ihave_length() {
|
||||||
iask = self.config.max_ihave_length().saturating_sub(*iasked);
|
iask = self.config.max_ihave_length().saturating_sub(*iasked);
|
||||||
@ -1147,7 +1147,7 @@ where
|
|||||||
let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
|
let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
|
||||||
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
|
if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
|
||||||
gossip_promises.add_promise(
|
gossip_promises.add_promise(
|
||||||
peer_id.clone(),
|
*peer_id,
|
||||||
&message_ids,
|
&message_ids,
|
||||||
Instant::now() + self.config.iwant_followup_time(),
|
Instant::now() + self.config.iwant_followup_time(),
|
||||||
);
|
);
|
||||||
@ -1159,7 +1159,7 @@ where
|
|||||||
|
|
||||||
Self::control_pool_add(
|
Self::control_pool_add(
|
||||||
&mut self.control_pool,
|
&mut self.control_pool,
|
||||||
peer_id.clone(),
|
*peer_id,
|
||||||
GossipsubControlAction::IWant { message_ids },
|
GossipsubControlAction::IWant { message_ids },
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -1203,11 +1203,11 @@ where
|
|||||||
// Send the messages to the peer
|
// Send the messages to the peer
|
||||||
let message_list = cached_messages
|
let message_list = cached_messages
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|entry| RawGossipsubMessage::from(entry.1))
|
.map(|entry| entry.1)
|
||||||
.collect();
|
.collect();
|
||||||
if self
|
if self
|
||||||
.send_message(
|
.send_message(
|
||||||
peer_id.clone(),
|
*peer_id,
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: message_list,
|
messages: message_list,
|
||||||
@ -1311,7 +1311,7 @@ where
|
|||||||
"GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
|
"GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
|
||||||
peer_id, &topic_hash
|
peer_id, &topic_hash
|
||||||
);
|
);
|
||||||
peers.insert(peer_id.clone());
|
peers.insert(*peer_id);
|
||||||
|
|
||||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||||
peer_score.graft(peer_id, topic_hash);
|
peer_score.graft(peer_id, topic_hash);
|
||||||
@ -1343,7 +1343,7 @@ where
|
|||||||
|
|
||||||
if self
|
if self
|
||||||
.send_message(
|
.send_message(
|
||||||
peer_id.clone(),
|
*peer_id,
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -1453,7 +1453,7 @@ where
|
|||||||
// it, see https://github.com/libp2p/specs/pull/217
|
// it, see https://github.com/libp2p/specs/pull/217
|
||||||
if let Some(peer_id) = p.peer_id {
|
if let Some(peer_id) = p.peer_id {
|
||||||
// mark as px peer
|
// mark as px peer
|
||||||
self.px_peers.insert(peer_id.clone());
|
self.px_peers.insert(peer_id);
|
||||||
|
|
||||||
// dial peer
|
// dial peer
|
||||||
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
self.events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
@ -1607,7 +1607,7 @@ where
|
|||||||
if !self.duplicate_cache.insert(msg_id.clone()) {
|
if !self.duplicate_cache.insert(msg_id.clone()) {
|
||||||
debug!(
|
debug!(
|
||||||
"Message already received, ignoring. Message: {}",
|
"Message already received, ignoring. Message: {}",
|
||||||
msg_id.clone()
|
msg_id
|
||||||
);
|
);
|
||||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||||
peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
|
peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
|
||||||
@ -1634,7 +1634,7 @@ where
|
|||||||
debug!("Sending received message to user");
|
debug!("Sending received message to user");
|
||||||
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
GossipsubEvent::Message {
|
GossipsubEvent::Message {
|
||||||
propagation_source: propagation_source.clone(),
|
propagation_source: *propagation_source,
|
||||||
message_id: msg_id.clone(),
|
message_id: msg_id.clone(),
|
||||||
message,
|
message,
|
||||||
},
|
},
|
||||||
@ -1740,7 +1740,7 @@ where
|
|||||||
|
|
||||||
match subscription.action {
|
match subscription.action {
|
||||||
GossipsubSubscriptionAction::Subscribe => {
|
GossipsubSubscriptionAction::Subscribe => {
|
||||||
if peer_list.insert(propagation_source.clone()) {
|
if peer_list.insert(*propagation_source) {
|
||||||
debug!(
|
debug!(
|
||||||
"SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
|
"SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
|
||||||
propagation_source.to_string(),
|
propagation_source.to_string(),
|
||||||
@ -1770,7 +1770,7 @@ where
|
|||||||
{
|
{
|
||||||
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
|
if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
|
||||||
if peers.len() < self.config.mesh_n_low()
|
if peers.len() < self.config.mesh_n_low()
|
||||||
&& peers.insert(propagation_source.clone())
|
&& peers.insert(*propagation_source)
|
||||||
{
|
{
|
||||||
debug!(
|
debug!(
|
||||||
"SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
|
"SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
|
||||||
@ -1796,7 +1796,7 @@ where
|
|||||||
// generates a subscription event to be polled
|
// generates a subscription event to be polled
|
||||||
application_event.push(NetworkBehaviourAction::GenerateEvent(
|
application_event.push(NetworkBehaviourAction::GenerateEvent(
|
||||||
GossipsubEvent::Subscribed {
|
GossipsubEvent::Subscribed {
|
||||||
peer_id: propagation_source.clone(),
|
peer_id: *propagation_source,
|
||||||
topic: subscription.topic_hash.clone(),
|
topic: subscription.topic_hash.clone(),
|
||||||
},
|
},
|
||||||
));
|
));
|
||||||
@ -1812,11 +1812,11 @@ where
|
|||||||
// remove topic from the peer_topics mapping
|
// remove topic from the peer_topics mapping
|
||||||
subscribed_topics.remove(&subscription.topic_hash);
|
subscribed_topics.remove(&subscription.topic_hash);
|
||||||
unsubscribed_peers
|
unsubscribed_peers
|
||||||
.push((propagation_source.clone(), subscription.topic_hash.clone()));
|
.push((*propagation_source, subscription.topic_hash.clone()));
|
||||||
// generate an unsubscribe event to be polled
|
// generate an unsubscribe event to be polled
|
||||||
application_event.push(NetworkBehaviourAction::GenerateEvent(
|
application_event.push(NetworkBehaviourAction::GenerateEvent(
|
||||||
GossipsubEvent::Unsubscribed {
|
GossipsubEvent::Unsubscribed {
|
||||||
peer_id: propagation_source.clone(),
|
peer_id: *propagation_source,
|
||||||
topic: subscription.topic_hash.clone(),
|
topic: subscription.topic_hash.clone(),
|
||||||
},
|
},
|
||||||
));
|
));
|
||||||
@ -1834,7 +1834,7 @@ where
|
|||||||
if !grafts.is_empty()
|
if !grafts.is_empty()
|
||||||
&& self
|
&& self
|
||||||
.send_message(
|
.send_message(
|
||||||
propagation_source.clone(),
|
*propagation_source,
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -1899,7 +1899,7 @@ where
|
|||||||
let peer_score = &self.peer_score;
|
let peer_score = &self.peer_score;
|
||||||
let mut score = |p: &PeerId| match peer_score {
|
let mut score = |p: &PeerId| match peer_score {
|
||||||
Some((peer_score, ..)) => *scores
|
Some((peer_score, ..)) => *scores
|
||||||
.entry(p.clone())
|
.entry(*p)
|
||||||
.or_insert_with(|| peer_score.score(p)),
|
.or_insert_with(|| peer_score.score(p)),
|
||||||
_ => 0.0,
|
_ => 0.0,
|
||||||
};
|
};
|
||||||
@ -1926,9 +1926,9 @@ where
|
|||||||
topic_hash
|
topic_hash
|
||||||
);
|
);
|
||||||
|
|
||||||
let current_topic = to_prune.entry(p.clone()).or_insert_with(Vec::new);
|
let current_topic = to_prune.entry(*p).or_insert_with(Vec::new);
|
||||||
current_topic.push(topic_hash.clone());
|
current_topic.push(topic_hash.clone());
|
||||||
no_px.insert(p.clone());
|
no_px.insert(*p);
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
@ -1963,7 +1963,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
for peer in &peer_list {
|
for peer in &peer_list {
|
||||||
let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new);
|
let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
|
||||||
current_topic.push(topic_hash.clone());
|
current_topic.push(topic_hash.clone());
|
||||||
}
|
}
|
||||||
// update the mesh
|
// update the mesh
|
||||||
@ -2046,7 +2046,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
for peer in &peer_list {
|
for peer in &peer_list {
|
||||||
let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new);
|
let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
|
||||||
current_topic.push(topic_hash.clone());
|
current_topic.push(topic_hash.clone());
|
||||||
}
|
}
|
||||||
// update the mesh
|
// update the mesh
|
||||||
@ -2102,8 +2102,7 @@ where
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
for peer in &peer_list {
|
for peer in &peer_list {
|
||||||
let current_topic =
|
let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
|
||||||
to_graft.entry(peer.clone()).or_insert_with(Vec::new);
|
|
||||||
current_topic.push(topic_hash.clone());
|
current_topic.push(topic_hash.clone());
|
||||||
}
|
}
|
||||||
// update the mesh
|
// update the mesh
|
||||||
@ -2151,12 +2150,12 @@ where
|
|||||||
"HEARTBEAT: Peer removed from fanout for topic: {:?}",
|
"HEARTBEAT: Peer removed from fanout for topic: {:?}",
|
||||||
topic_hash
|
topic_hash
|
||||||
);
|
);
|
||||||
to_remove_peers.push(peer.clone());
|
to_remove_peers.push(*peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
// remove if the peer has disconnected
|
// remove if the peer has disconnected
|
||||||
to_remove_peers.push(peer.clone());
|
to_remove_peers.push(*peer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2205,7 +2204,7 @@ where
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|p| {
|
.map(|p| {
|
||||||
(
|
(
|
||||||
p.clone(),
|
*p,
|
||||||
peer_score
|
peer_score
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("peer_score.is_some()")
|
.expect("peer_score.is_some()")
|
||||||
@ -2295,7 +2294,7 @@ where
|
|||||||
// send an IHAVE message
|
// send an IHAVE message
|
||||||
Self::control_pool_add(
|
Self::control_pool_add(
|
||||||
&mut self.control_pool,
|
&mut self.control_pool,
|
||||||
peer.clone(),
|
peer,
|
||||||
GossipsubControlAction::IHave {
|
GossipsubControlAction::IHave {
|
||||||
topic_hash: topic_hash.clone(),
|
topic_hash: topic_hash.clone(),
|
||||||
message_ids: peer_message_ids,
|
message_ids: peer_message_ids,
|
||||||
@ -2346,7 +2345,7 @@ where
|
|||||||
// send the control messages
|
// send the control messages
|
||||||
if self
|
if self
|
||||||
.send_message(
|
.send_message(
|
||||||
peer.clone(),
|
*peer,
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -2374,7 +2373,7 @@ where
|
|||||||
.collect();
|
.collect();
|
||||||
if self
|
if self
|
||||||
.send_message(
|
.send_message(
|
||||||
peer.clone(),
|
*peer,
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
@ -2414,7 +2413,7 @@ where
|
|||||||
if let Some(mesh_peers) = self.mesh.get(&topic) {
|
if let Some(mesh_peers) = self.mesh.get(&topic) {
|
||||||
for peer_id in mesh_peers {
|
for peer_id in mesh_peers {
|
||||||
if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
|
if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
|
||||||
recipient_peers.insert(peer_id.clone());
|
recipient_peers.insert(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2426,7 +2425,7 @@ where
|
|||||||
&& Some(p) != message.source.as_ref()
|
&& Some(p) != message.source.as_ref()
|
||||||
&& topics.contains(&message.topic)
|
&& topics.contains(&message.topic)
|
||||||
{
|
{
|
||||||
recipient_peers.insert(p.clone());
|
recipient_peers.insert(*p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2436,7 +2435,7 @@ where
|
|||||||
let event = Arc::new(
|
let event = Arc::new(
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
subscriptions: Vec::new(),
|
subscriptions: Vec::new(),
|
||||||
messages: vec![RawGossipsubMessage::from(message.clone())],
|
messages: vec![message.clone()],
|
||||||
control_msgs: Vec::new(),
|
control_msgs: Vec::new(),
|
||||||
}
|
}
|
||||||
.into_protobuf(),
|
.into_protobuf(),
|
||||||
@ -2444,7 +2443,7 @@ where
|
|||||||
|
|
||||||
for peer in recipient_peers.iter() {
|
for peer in recipient_peers.iter() {
|
||||||
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
|
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
|
||||||
self.send_message(peer.clone(), event.clone())?;
|
self.send_message(*peer, event.clone())?;
|
||||||
}
|
}
|
||||||
debug!("Completed forwarding message");
|
debug!("Completed forwarding message");
|
||||||
Ok(true)
|
Ok(true)
|
||||||
@ -2490,7 +2489,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(RawGossipsubMessage {
|
Ok(RawGossipsubMessage {
|
||||||
source: Some(author.clone()),
|
source: Some(*author),
|
||||||
data,
|
data,
|
||||||
// To be interoperable with the go-implementation this is treated as a 64-bit
|
// To be interoperable with the go-implementation this is treated as a 64-bit
|
||||||
// big-endian uint.
|
// big-endian uint.
|
||||||
@ -2503,7 +2502,7 @@ where
|
|||||||
}
|
}
|
||||||
PublishConfig::Author(peer_id) => {
|
PublishConfig::Author(peer_id) => {
|
||||||
Ok(RawGossipsubMessage {
|
Ok(RawGossipsubMessage {
|
||||||
source: Some(peer_id.clone()),
|
source: Some(*peer_id),
|
||||||
data,
|
data,
|
||||||
// To be interoperable with the go-implementation this is treated as a 64-bit
|
// To be interoperable with the go-implementation this is treated as a 64-bit
|
||||||
// big-endian uint.
|
// big-endian uint.
|
||||||
@ -2591,7 +2590,7 @@ where
|
|||||||
for message in messages {
|
for message in messages {
|
||||||
self.events
|
self.events
|
||||||
.push_back(NetworkBehaviourAction::NotifyHandler {
|
.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: peer_id.clone(),
|
peer_id,
|
||||||
event: message,
|
event: message,
|
||||||
handler: NotifyHandler::Any,
|
handler: NotifyHandler::Any,
|
||||||
})
|
})
|
||||||
@ -2774,7 +2773,7 @@ where
|
|||||||
// send our subscriptions to the peer
|
// send our subscriptions to the peer
|
||||||
if self
|
if self
|
||||||
.send_message(
|
.send_message(
|
||||||
peer_id.clone(),
|
*peer_id,
|
||||||
GossipsubRpc {
|
GossipsubRpc {
|
||||||
messages: Vec::new(),
|
messages: Vec::new(),
|
||||||
subscriptions,
|
subscriptions,
|
||||||
@ -2789,7 +2788,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert an empty set of the topics of this peer until known.
|
// Insert an empty set of the topics of this peer until known.
|
||||||
self.peer_topics.insert(peer_id.clone(), Default::default());
|
self.peer_topics.insert(*peer_id, Default::default());
|
||||||
|
|
||||||
// By default we assume a peer is only a floodsub peer.
|
// By default we assume a peer is only a floodsub peer.
|
||||||
//
|
//
|
||||||
@ -2797,11 +2796,11 @@ where
|
|||||||
// update the type of peer that this is in order to determine which kind of routing should
|
// update the type of peer that this is in order to determine which kind of routing should
|
||||||
// occur.
|
// occur.
|
||||||
self.peer_protocols
|
self.peer_protocols
|
||||||
.entry(peer_id.clone())
|
.entry(*peer_id)
|
||||||
.or_insert(PeerKind::Floodsub);
|
.or_insert(PeerKind::Floodsub);
|
||||||
|
|
||||||
if let Some((peer_score, ..)) = &mut self.peer_score {
|
if let Some((peer_score, ..)) = &mut self.peer_score {
|
||||||
peer_score.add_peer(peer_id.clone());
|
peer_score.add_peer(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2886,7 +2885,7 @@ where
|
|||||||
if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
|
if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
|
||||||
// The first connection is outbound and it is not a peer from peer exchange => mark
|
// The first connection is outbound and it is not a peer from peer exchange => mark
|
||||||
// it as outbound peer
|
// it as outbound peer
|
||||||
self.outbound_peers.insert(peer_id.clone());
|
self.outbound_peers.insert(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,7 +393,7 @@ impl Default for GossipsubConfigBuilder {
|
|||||||
let mut source_string = if let Some(peer_id) = message.source.as_ref() {
|
let mut source_string = if let Some(peer_id) = message.source.as_ref() {
|
||||||
peer_id.to_base58()
|
peer_id.to_base58()
|
||||||
} else {
|
} else {
|
||||||
PeerId::from_bytes(&vec![0, 1, 0])
|
PeerId::from_bytes(&[0, 1, 0])
|
||||||
.expect("Valid peer id")
|
.expect("Valid peer id")
|
||||||
.to_base58()
|
.to_base58()
|
||||||
};
|
};
|
||||||
|
@ -83,7 +83,7 @@ impl GossipPromises {
|
|||||||
self.promises.retain(|msg, peers| {
|
self.promises.retain(|msg, peers| {
|
||||||
peers.retain(|peer_id, expires| {
|
peers.retain(|peer_id, expires| {
|
||||||
if *expires < now {
|
if *expires < now {
|
||||||
let count = result.entry(peer_id.clone()).or_insert(0);
|
let count = result.entry(*peer_id).or_insert(0);
|
||||||
*count += 1;
|
*count += 1;
|
||||||
debug!(
|
debug!(
|
||||||
"The peer {} broke the promise to deliver message {} in time!",
|
"The peer {} broke the promise to deliver message {} in time!",
|
||||||
|
@ -110,7 +110,7 @@ impl MessageCache {
|
|||||||
let count = iwant_counts
|
let count = iwant_counts
|
||||||
.entry(message_id.clone())
|
.entry(message_id.clone())
|
||||||
.or_default()
|
.or_default()
|
||||||
.entry(peer.clone())
|
.entry(*peer)
|
||||||
.or_default();
|
.or_default();
|
||||||
*count += 1;
|
*count += 1;
|
||||||
*count
|
*count
|
||||||
|
@ -432,7 +432,7 @@ impl PeerScore {
|
|||||||
/// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it
|
/// Adds a new ip to a peer, if the peer is not yet known creates a new peer_stats entry for it
|
||||||
pub fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
|
pub fn add_ip(&mut self, peer_id: &PeerId, ip: IpAddr) {
|
||||||
trace!("Add ip for peer {}, ip: {}", peer_id, ip);
|
trace!("Add ip for peer {}, ip: {}", peer_id, ip);
|
||||||
let peer_stats = self.peer_stats.entry(peer_id.clone()).or_default();
|
let peer_stats = self.peer_stats.entry(*peer_id).or_default();
|
||||||
|
|
||||||
// Mark the peer as connected (currently the default is connected, but we don't want to
|
// Mark the peer as connected (currently the default is connected, but we don't want to
|
||||||
// rely on the default).
|
// rely on the default).
|
||||||
@ -443,7 +443,7 @@ impl PeerScore {
|
|||||||
self.peer_ips
|
self.peer_ips
|
||||||
.entry(ip)
|
.entry(ip)
|
||||||
.or_insert_with(HashSet::new)
|
.or_insert_with(HashSet::new)
|
||||||
.insert(peer_id.clone());
|
.insert(*peer_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes an ip from a peer
|
/// Removes an ip from a peer
|
||||||
@ -474,7 +474,7 @@ impl PeerScore {
|
|||||||
pub fn remove_peer(&mut self, peer_id: &PeerId) {
|
pub fn remove_peer(&mut self, peer_id: &PeerId) {
|
||||||
// we only retain non-positive scores of peers
|
// we only retain non-positive scores of peers
|
||||||
if self.score(peer_id) > 0f64 {
|
if self.score(peer_id) > 0f64 {
|
||||||
if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(peer_id.clone()) {
|
if let hash_map::Entry::Occupied(entry) = self.peer_stats.entry(*peer_id) {
|
||||||
Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
|
Self::remove_ips_for_peer(entry.get(), &mut self.peer_ips, peer_id);
|
||||||
entry.remove();
|
entry.remove();
|
||||||
}
|
}
|
||||||
@ -692,11 +692,11 @@ impl PeerScore {
|
|||||||
DeliveryStatus::Unknown => {
|
DeliveryStatus::Unknown => {
|
||||||
// the message is being validated; track the peer delivery and wait for
|
// the message is being validated; track the peer delivery and wait for
|
||||||
// the Deliver/Reject notification.
|
// the Deliver/Reject notification.
|
||||||
record.peers.insert(from.clone());
|
record.peers.insert(*from);
|
||||||
}
|
}
|
||||||
DeliveryStatus::Valid(validated) => {
|
DeliveryStatus::Valid(validated) => {
|
||||||
// mark the peer delivery time to only count a duplicate delivery once.
|
// mark the peer delivery time to only count a duplicate delivery once.
|
||||||
record.peers.insert(from.clone());
|
record.peers.insert(*from);
|
||||||
self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
|
self.mark_duplicate_message_delivery(from, topic_hash, Some(validated));
|
||||||
}
|
}
|
||||||
DeliveryStatus::Invalid => {
|
DeliveryStatus::Invalid => {
|
||||||
|
@ -117,7 +117,7 @@ impl NetworkBehaviour for Identify {
|
|||||||
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
|
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
self.observed_addresses.entry(peer_id.clone()).or_default().insert(*conn, addr);
|
self.observed_addresses.entry(*peer_id).or_default().insert(*conn, addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
|
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
|
||||||
|
@ -35,13 +35,12 @@ use std::{fmt, io, iter, pin::Pin};
|
|||||||
pub struct IdentifyProtocolConfig;
|
pub struct IdentifyProtocolConfig;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
#[non_exhaustive]
|
||||||
pub struct RemoteInfo {
|
pub struct RemoteInfo {
|
||||||
/// Information about the remote.
|
/// Information about the remote.
|
||||||
pub info: IdentifyInfo,
|
pub info: IdentifyInfo,
|
||||||
/// Address the remote sees for us.
|
/// Address the remote sees for us.
|
||||||
pub observed_addr: Multiaddr,
|
pub observed_addr: Multiaddr,
|
||||||
|
|
||||||
_priv: ()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The substream on which a reply is expected to be sent.
|
/// The substream on which a reply is expected to be sent.
|
||||||
@ -80,7 +79,7 @@ where
|
|||||||
agent_version: Some(info.agent_version),
|
agent_version: Some(info.agent_version),
|
||||||
protocol_version: Some(info.protocol_version),
|
protocol_version: Some(info.protocol_version),
|
||||||
public_key: Some(pubkey_bytes),
|
public_key: Some(pubkey_bytes),
|
||||||
listen_addrs: listen_addrs,
|
listen_addrs,
|
||||||
observed_addr: Some(observed_addr.to_vec()),
|
observed_addr: Some(observed_addr.to_vec()),
|
||||||
protocols: info.protocols
|
protocols: info.protocols
|
||||||
};
|
};
|
||||||
@ -158,8 +157,7 @@ where
|
|||||||
|
|
||||||
Ok(RemoteInfo {
|
Ok(RemoteInfo {
|
||||||
info,
|
info,
|
||||||
observed_addr: observed_addr.clone(),
|
observed_addr,
|
||||||
_priv: ()
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ pub struct Addresses {
|
|||||||
addrs: SmallVec<[Multiaddr; 6]>,
|
addrs: SmallVec<[Multiaddr; 6]>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::len_without_is_empty)]
|
||||||
impl Addresses {
|
impl Addresses {
|
||||||
/// Creates a new list of addresses.
|
/// Creates a new list of addresses.
|
||||||
pub fn new(addr: Multiaddr) -> Addresses {
|
pub fn new(addr: Multiaddr) -> Addresses {
|
||||||
|
@ -343,7 +343,7 @@ where
|
|||||||
.record_replication_interval
|
.record_replication_interval
|
||||||
.or(config.record_publication_interval)
|
.or(config.record_publication_interval)
|
||||||
.map(|interval| PutRecordJob::new(
|
.map(|interval| PutRecordJob::new(
|
||||||
id.clone(),
|
id,
|
||||||
interval,
|
interval,
|
||||||
config.record_publication_interval,
|
config.record_publication_interval,
|
||||||
config.record_ttl,
|
config.record_ttl,
|
||||||
@ -371,7 +371,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Gets an iterator over immutable references to all running queries.
|
/// Gets an iterator over immutable references to all running queries.
|
||||||
pub fn iter_queries<'a>(&'a self) -> impl Iterator<Item = QueryRef<'a>> {
|
pub fn iter_queries(&self) -> impl Iterator<Item = QueryRef<'_>> {
|
||||||
self.queries.iter().filter_map(|query|
|
self.queries.iter().filter_map(|query|
|
||||||
if !query.is_finished() {
|
if !query.is_finished() {
|
||||||
Some(QueryRef { query })
|
Some(QueryRef { query })
|
||||||
@ -381,7 +381,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Gets an iterator over mutable references to all running queries.
|
/// Gets an iterator over mutable references to all running queries.
|
||||||
pub fn iter_queries_mut<'a>(&'a mut self) -> impl Iterator<Item = QueryMut<'a>> {
|
pub fn iter_queries_mut(&mut self) -> impl Iterator<Item = QueryMut<'_>> {
|
||||||
self.queries.iter_mut().filter_map(|query|
|
self.queries.iter_mut().filter_map(|query|
|
||||||
if !query.is_finished() {
|
if !query.is_finished() {
|
||||||
Some(QueryMut { query })
|
Some(QueryMut { query })
|
||||||
@ -391,7 +391,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Gets an immutable reference to a running query, if it exists.
|
/// Gets an immutable reference to a running query, if it exists.
|
||||||
pub fn query<'a>(&'a self, id: &QueryId) -> Option<QueryRef<'a>> {
|
pub fn query(&self, id: &QueryId) -> Option<QueryRef<'_>> {
|
||||||
self.queries.get(id).and_then(|query|
|
self.queries.get(id).and_then(|query|
|
||||||
if !query.is_finished() {
|
if !query.is_finished() {
|
||||||
Some(QueryRef { query })
|
Some(QueryRef { query })
|
||||||
@ -434,7 +434,7 @@ where
|
|||||||
if entry.value().insert(address) {
|
if entry.value().insert(address) {
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
KademliaEvent::RoutingUpdated {
|
KademliaEvent::RoutingUpdated {
|
||||||
peer: peer.clone(),
|
peer: *peer,
|
||||||
addresses: entry.value().clone(),
|
addresses: entry.value().clone(),
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
}
|
}
|
||||||
@ -458,7 +458,7 @@ where
|
|||||||
kbucket::InsertResult::Inserted => {
|
kbucket::InsertResult::Inserted => {
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
KademliaEvent::RoutingUpdated {
|
KademliaEvent::RoutingUpdated {
|
||||||
peer: peer.clone(),
|
peer: *peer,
|
||||||
addresses,
|
addresses,
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
}
|
}
|
||||||
@ -620,7 +620,7 @@ where
|
|||||||
/// with an explicit expiration will always expire at that instant and until then
|
/// with an explicit expiration will always expire at that instant and until then
|
||||||
/// is subject to regular (re-)replication and (re-)publication.
|
/// is subject to regular (re-)replication and (re-)publication.
|
||||||
pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
|
pub fn put_record(&mut self, mut record: Record, quorum: Quorum) -> Result<QueryId, store::Error> {
|
||||||
record.publisher = Some(self.kbuckets.local_key().preimage().clone());
|
record.publisher = Some(*self.kbuckets.local_key().preimage());
|
||||||
self.store.put(record.clone())?;
|
self.store.put(record.clone())?;
|
||||||
record.expires = record.expires.or_else(||
|
record.expires = record.expires.or_else(||
|
||||||
self.record_ttl.map(|ttl| Instant::now() + ttl));
|
self.record_ttl.map(|ttl| Instant::now() + ttl));
|
||||||
@ -682,7 +682,7 @@ where
|
|||||||
pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
|
pub fn bootstrap(&mut self) -> Result<QueryId, NoKnownPeers> {
|
||||||
let local_key = self.kbuckets.local_key().clone();
|
let local_key = self.kbuckets.local_key().clone();
|
||||||
let info = QueryInfo::Bootstrap {
|
let info = QueryInfo::Bootstrap {
|
||||||
peer: local_key.preimage().clone(),
|
peer: *local_key.preimage(),
|
||||||
remaining: None
|
remaining: None
|
||||||
};
|
};
|
||||||
let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
|
let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
|
||||||
@ -723,7 +723,7 @@ where
|
|||||||
let local_addrs = Vec::new();
|
let local_addrs = Vec::new();
|
||||||
let record = ProviderRecord::new(
|
let record = ProviderRecord::new(
|
||||||
key.clone(),
|
key.clone(),
|
||||||
self.kbuckets.local_key().preimage().clone(),
|
*self.kbuckets.local_key().preimage(),
|
||||||
local_addrs);
|
local_addrs);
|
||||||
self.store.add_provider(record)?;
|
self.store.add_provider(record)?;
|
||||||
let target = kbucket::Key::new(key.clone());
|
let target = kbucket::Key::new(key.clone());
|
||||||
@ -767,15 +767,15 @@ where
|
|||||||
where
|
where
|
||||||
I: Iterator<Item = &'a KadPeer> + Clone
|
I: Iterator<Item = &'a KadPeer> + Clone
|
||||||
{
|
{
|
||||||
let local_id = self.kbuckets.local_key().preimage().clone();
|
let local_id = self.kbuckets.local_key().preimage();
|
||||||
let others_iter = peers.filter(|p| p.node_id != local_id);
|
let others_iter = peers.filter(|p| &p.node_id != local_id);
|
||||||
if let Some(query) = self.queries.get_mut(query_id) {
|
if let Some(query) = self.queries.get_mut(query_id) {
|
||||||
log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
|
log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
|
||||||
for peer in others_iter.clone() {
|
for peer in others_iter.clone() {
|
||||||
log::trace!("Peer {:?} reported by {:?} in query {:?}.",
|
log::trace!("Peer {:?} reported by {:?} in query {:?}.",
|
||||||
peer, source, query_id);
|
peer, source, query_id);
|
||||||
let addrs = peer.multiaddrs.iter().cloned().collect();
|
let addrs = peer.multiaddrs.iter().cloned().collect();
|
||||||
query.inner.addresses.insert(peer.node_id.clone(), addrs);
|
query.inner.addresses.insert(peer.node_id, addrs);
|
||||||
}
|
}
|
||||||
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
|
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
|
||||||
}
|
}
|
||||||
@ -919,7 +919,7 @@ where
|
|||||||
match entry.insert(addresses.clone(), new_status) {
|
match entry.insert(addresses.clone(), new_status) {
|
||||||
kbucket::InsertResult::Inserted => {
|
kbucket::InsertResult::Inserted => {
|
||||||
let event = KademliaEvent::RoutingUpdated {
|
let event = KademliaEvent::RoutingUpdated {
|
||||||
peer: peer.clone(),
|
peer,
|
||||||
addresses,
|
addresses,
|
||||||
old_peer: None,
|
old_peer: None,
|
||||||
};
|
};
|
||||||
@ -1045,7 +1045,7 @@ where
|
|||||||
key,
|
key,
|
||||||
phase: AddProviderPhase::GetClosestPeers
|
phase: AddProviderPhase::GetClosestPeers
|
||||||
} => {
|
} => {
|
||||||
let provider_id = params.local_peer_id().clone();
|
let provider_id = *params.local_peer_id();
|
||||||
let external_addresses = params.external_addresses().map(|r| r.addr).collect();
|
let external_addresses = params.external_addresses().map(|r| r.addr).collect();
|
||||||
let inner = QueryInner::new(QueryInfo::AddProvider {
|
let inner = QueryInner::new(QueryInfo::AddProvider {
|
||||||
context,
|
context,
|
||||||
@ -1487,7 +1487,7 @@ where
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
self.connected_peers.insert(peer.clone());
|
self.connected_peers.insert(*peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_address_change(
|
fn inject_address_change(
|
||||||
@ -1594,7 +1594,7 @@ where
|
|||||||
for query in self.queries.iter_mut() {
|
for query in self.queries.iter_mut() {
|
||||||
query.on_failure(id);
|
query.on_failure(id);
|
||||||
}
|
}
|
||||||
self.connection_updated(id.clone(), None, NodeStatus::Disconnected);
|
self.connection_updated(*id, None, NodeStatus::Disconnected);
|
||||||
self.connected_peers.remove(id);
|
self.connected_peers.remove(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1612,7 +1612,7 @@ where
|
|||||||
// since the remote address on an inbound connection may be specific
|
// since the remote address on an inbound connection may be specific
|
||||||
// to that connection (e.g. typically the TCP port numbers).
|
// to that connection (e.g. typically the TCP port numbers).
|
||||||
let address = match endpoint {
|
let address = match endpoint {
|
||||||
ConnectedPoint::Dialer { address } => Some(address.clone()),
|
ConnectedPoint::Dialer { address } => Some(address),
|
||||||
ConnectedPoint::Listener { .. } => None,
|
ConnectedPoint::Listener { .. } => None,
|
||||||
};
|
};
|
||||||
self.connection_updated(source, address, NodeStatus::Connected);
|
self.connection_updated(source, address, NodeStatus::Connected);
|
||||||
@ -1725,7 +1725,7 @@ where
|
|||||||
key, records, quorum, cache_at
|
key, records, quorum, cache_at
|
||||||
} = &mut query.inner.info {
|
} = &mut query.inner.info {
|
||||||
if let Some(record) = record {
|
if let Some(record) = record {
|
||||||
records.push(PeerRecord{ peer: Some(source.clone()), record });
|
records.push(PeerRecord{ peer: Some(source), record });
|
||||||
|
|
||||||
let quorum = quorum.get();
|
let quorum = quorum.get();
|
||||||
if records.len() >= quorum {
|
if records.len() >= quorum {
|
||||||
@ -1749,7 +1749,7 @@ where
|
|||||||
// closest node to the key that did *not* return the
|
// closest node to the key that did *not* return the
|
||||||
// value is tracked in order to cache the record on
|
// value is tracked in order to cache the record on
|
||||||
// that node if the query turns out to be successful.
|
// that node if the query turns out to be successful.
|
||||||
let source_key = kbucket::Key::from(source.clone());
|
let source_key = kbucket::Key::from(source);
|
||||||
if let Some(cache_key) = cache_at {
|
if let Some(cache_key) = cache_at {
|
||||||
let key = kbucket::Key::new(key.clone());
|
let key = kbucket::Key::new(key.clone());
|
||||||
if source_key.distance(&key) < cache_key.distance(&key) {
|
if source_key.distance(&key) < cache_key.distance(&key) {
|
||||||
@ -1780,7 +1780,7 @@ where
|
|||||||
if let QueryInfo::PutRecord {
|
if let QueryInfo::PutRecord {
|
||||||
phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
|
phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
|
||||||
} = &mut query.inner.info {
|
} = &mut query.inner.info {
|
||||||
success.push(source.clone());
|
success.push(source);
|
||||||
|
|
||||||
let quorum = quorum.get();
|
let quorum = quorum.get();
|
||||||
if success.len() >= quorum {
|
if success.len() >= quorum {
|
||||||
@ -1905,7 +1905,7 @@ where
|
|||||||
peer_id, event, handler: NotifyHandler::Any
|
peer_id, event, handler: NotifyHandler::Any
|
||||||
});
|
});
|
||||||
} else if &peer_id != self.kbuckets.local_key().preimage() {
|
} else if &peer_id != self.kbuckets.local_key().preimage() {
|
||||||
query.inner.pending_rpcs.push((peer_id.clone(), event));
|
query.inner.pending_rpcs.push((peer_id, event));
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
|
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id, condition: DialPeerCondition::Disconnected
|
peer_id, condition: DialPeerCondition::Disconnected
|
||||||
});
|
});
|
||||||
@ -2423,7 +2423,7 @@ impl QueryInfo {
|
|||||||
KademliaHandlerIn::AddProvider {
|
KademliaHandlerIn::AddProvider {
|
||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
provider: crate::protocol::KadPeer {
|
provider: crate::protocol::KadPeer {
|
||||||
node_id: provider_id.clone(),
|
node_id: *provider_id,
|
||||||
multiaddrs: external_addresses.clone(),
|
multiaddrs: external_addresses.clone(),
|
||||||
connection_ty: crate::protocol::KadConnectionType::Connected,
|
connection_ty: crate::protocol::KadConnectionType::Connected,
|
||||||
}
|
}
|
||||||
|
@ -167,6 +167,7 @@ fn bootstrap() {
|
|||||||
).into_iter()
|
).into_iter()
|
||||||
.map(|(_a, s)| s)
|
.map(|(_a, s)| s)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let swarm_ids: Vec<_> = swarms.iter()
|
let swarm_ids: Vec<_> = swarms.iter()
|
||||||
.map(Swarm::local_peer_id)
|
.map(Swarm::local_peer_id)
|
||||||
.cloned()
|
.cloned()
|
||||||
@ -466,7 +467,7 @@ fn put_record() {
|
|||||||
// Connect `single_swarm` to three bootnodes.
|
// Connect `single_swarm` to three bootnodes.
|
||||||
for i in 0..3 {
|
for i in 0..3 {
|
||||||
single_swarm.1.add_address(
|
single_swarm.1.add_address(
|
||||||
Swarm::local_peer_id(&fully_connected_swarms[i].1),
|
&Swarm::local_peer_id(&fully_connected_swarms[i].1),
|
||||||
fully_connected_swarms[i].0.clone(),
|
fully_connected_swarms[i].0.clone(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -745,7 +746,7 @@ fn add_provider() {
|
|||||||
// Connect `single_swarm` to three bootnodes.
|
// Connect `single_swarm` to three bootnodes.
|
||||||
for i in 0..3 {
|
for i in 0..3 {
|
||||||
single_swarm.1.add_address(
|
single_swarm.1.add_address(
|
||||||
Swarm::local_peer_id(&fully_connected_swarms[i].1),
|
&Swarm::local_peer_id(&fully_connected_swarms[i].1),
|
||||||
fully_connected_swarms[i].0.clone(),
|
fully_connected_swarms[i].0.clone(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -944,8 +945,8 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
|
|||||||
trudy.1.store.put(record_trudy.clone()).unwrap();
|
trudy.1.store.put(record_trudy.clone()).unwrap();
|
||||||
|
|
||||||
// Make `trudy` and `bob` known to `alice`.
|
// Make `trudy` and `bob` known to `alice`.
|
||||||
alice.1.add_address(Swarm::local_peer_id(&trudy.1), trudy.0.clone());
|
alice.1.add_address(&Swarm::local_peer_id(&trudy.1), trudy.0.clone());
|
||||||
alice.1.add_address(Swarm::local_peer_id(&bob.1), bob.0.clone());
|
alice.1.add_address(&Swarm::local_peer_id(&bob.1), bob.0.clone());
|
||||||
|
|
||||||
// Drop the swarm addresses.
|
// Drop the swarm addresses.
|
||||||
let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1);
|
let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1);
|
||||||
|
@ -531,7 +531,7 @@ where
|
|||||||
}
|
}
|
||||||
KademliaHandlerIn::FindNodeReq { key, user_data } => {
|
KademliaHandlerIn::FindNodeReq { key, user_data } => {
|
||||||
let msg = KadRequestMsg::FindNode { key };
|
let msg = KadRequestMsg::FindNode { key };
|
||||||
self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
|
self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
|
||||||
}
|
}
|
||||||
KademliaHandlerIn::FindNodeRes {
|
KademliaHandlerIn::FindNodeRes {
|
||||||
closer_peers,
|
closer_peers,
|
||||||
@ -550,7 +550,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let msg = KadResponseMsg::FindNode {
|
let msg = KadResponseMsg::FindNode {
|
||||||
closer_peers: closer_peers.clone(),
|
closer_peers,
|
||||||
};
|
};
|
||||||
self.substreams
|
self.substreams
|
||||||
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
|
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
|
||||||
@ -559,7 +559,7 @@ where
|
|||||||
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
|
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
|
||||||
let msg = KadRequestMsg::GetProviders { key };
|
let msg = KadRequestMsg::GetProviders { key };
|
||||||
self.substreams
|
self.substreams
|
||||||
.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
|
.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
|
||||||
}
|
}
|
||||||
KademliaHandlerIn::GetProvidersRes {
|
KademliaHandlerIn::GetProvidersRes {
|
||||||
closer_peers,
|
closer_peers,
|
||||||
@ -582,8 +582,8 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
let msg = KadResponseMsg::GetProviders {
|
let msg = KadResponseMsg::GetProviders {
|
||||||
closer_peers: closer_peers.clone(),
|
closer_peers,
|
||||||
provider_peers: provider_peers.clone(),
|
provider_peers,
|
||||||
};
|
};
|
||||||
self.substreams
|
self.substreams
|
||||||
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
|
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
|
||||||
@ -622,7 +622,7 @@ where
|
|||||||
|
|
||||||
let msg = KadResponseMsg::GetValue {
|
let msg = KadResponseMsg::GetValue {
|
||||||
record,
|
record,
|
||||||
closer_peers: closer_peers.clone(),
|
closer_peers,
|
||||||
};
|
};
|
||||||
self.substreams
|
self.substreams
|
||||||
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
|
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
|
||||||
|
@ -224,15 +224,11 @@ impl PutRecordJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let PeriodicJobState::Running(records) = &mut self.inner.state {
|
if let PeriodicJobState::Running(records) = &mut self.inner.state {
|
||||||
loop {
|
for r in records {
|
||||||
if let Some(r) = records.next() {
|
if r.is_expired(now) {
|
||||||
if r.is_expired(now) {
|
store.remove(&r.key)
|
||||||
store.remove(&r.key)
|
|
||||||
} else {
|
|
||||||
return Poll::Ready(r)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
break
|
return Poll::Ready(r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -301,15 +297,11 @@ impl AddProviderJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let PeriodicJobState::Running(keys) = &mut self.inner.state {
|
if let PeriodicJobState::Running(keys) = &mut self.inner.state {
|
||||||
loop {
|
for r in keys {
|
||||||
if let Some(r) = keys.next() {
|
if r.is_expired(now) {
|
||||||
if r.is_expired(now) {
|
store.remove_provider(&r.key, &r.provider)
|
||||||
store.remove_provider(&r.key, &r.provider)
|
|
||||||
} else {
|
|
||||||
return Poll::Ready(r)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
break
|
return Poll::Ready(r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,6 +68,8 @@
|
|||||||
|
|
||||||
mod bucket;
|
mod bucket;
|
||||||
mod entry;
|
mod entry;
|
||||||
|
#[allow(clippy::ptr_offset_with_cast)]
|
||||||
|
#[allow(clippy::assign_op_pattern)]
|
||||||
mod key;
|
mod key;
|
||||||
|
|
||||||
pub use entry::*;
|
pub use entry::*;
|
||||||
|
@ -258,7 +258,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Updates the status of the pending node, if any.
|
/// Updates the status of the pending node, if any.
|
||||||
|
@ -207,9 +207,9 @@ impl<TInner> QueryPool<TInner> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if self.queries.is_empty() {
|
if self.queries.is_empty() {
|
||||||
return QueryPoolState::Idle
|
QueryPoolState::Idle
|
||||||
} else {
|
} else {
|
||||||
return QueryPoolState::Waiting(None)
|
QueryPoolState::Waiting(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,7 +148,7 @@ impl ClosestPeersIter {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
let key = Key::from(peer.clone());
|
let key = Key::from(*peer);
|
||||||
let distance = key.distance(&self.target);
|
let distance = key.distance(&self.target);
|
||||||
|
|
||||||
// Mark the peer as succeeded.
|
// Mark the peer as succeeded.
|
||||||
@ -222,7 +222,7 @@ impl ClosestPeersIter {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
let key = Key::from(peer.clone());
|
let key = Key::from(*peer);
|
||||||
let distance = key.distance(&self.target);
|
let distance = key.distance(&self.target);
|
||||||
|
|
||||||
match self.closest_peers.entry(distance) {
|
match self.closest_peers.entry(distance) {
|
||||||
|
@ -131,7 +131,7 @@ impl FixedPeersIter {
|
|||||||
|
|
||||||
pub fn next(&mut self) -> PeersIterState<'_> {
|
pub fn next(&mut self) -> PeersIterState<'_> {
|
||||||
match &mut self.state {
|
match &mut self.state {
|
||||||
State::Finished => return PeersIterState::Finished,
|
State::Finished => PeersIterState::Finished,
|
||||||
State::Waiting { num_waiting } => {
|
State::Waiting { num_waiting } => {
|
||||||
if *num_waiting >= self.parallelism.get() {
|
if *num_waiting >= self.parallelism.get() {
|
||||||
return PeersIterState::WaitingAtCapacity
|
return PeersIterState::WaitingAtCapacity
|
||||||
@ -144,7 +144,7 @@ impl FixedPeersIter {
|
|||||||
} else {
|
} else {
|
||||||
return PeersIterState::Waiting(None)
|
return PeersIterState::Waiting(None)
|
||||||
}
|
}
|
||||||
Some(p) => match self.peers.entry(p.clone()) {
|
Some(p) => match self.peers.entry(p) {
|
||||||
Entry::Occupied(_) => {} // skip duplicates
|
Entry::Occupied(_) => {} // skip duplicates
|
||||||
Entry::Vacant(e) => {
|
Entry::Vacant(e) => {
|
||||||
*num_waiting += 1;
|
*num_waiting += 1;
|
||||||
|
@ -205,7 +205,7 @@ impl<'a> RecordStore<'a> for MemoryStore {
|
|||||||
let p = providers.remove(i);
|
let p = providers.remove(i);
|
||||||
self.provided.remove(&p);
|
self.provided.remove(&p);
|
||||||
}
|
}
|
||||||
if providers.len() == 0 {
|
if providers.is_empty() {
|
||||||
e.remove();
|
e.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -201,8 +201,8 @@ impl NetworkBehaviour for Mdns {
|
|||||||
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
|
if let MdnsBusyWrapper::Free(ref mut service) = self.service {
|
||||||
for packet in build_query_response(
|
for packet in build_query_response(
|
||||||
query.query_id(),
|
query.query_id(),
|
||||||
params.local_peer_id().clone(),
|
*params.local_peer_id(),
|
||||||
params.listened_addresses().into_iter(),
|
params.listened_addresses(),
|
||||||
MDNS_RESPONSE_TTL,
|
MDNS_RESPONSE_TTL,
|
||||||
) {
|
) {
|
||||||
service.enqueue_response(packet)
|
service.enqueue_response(packet)
|
||||||
@ -240,10 +240,10 @@ impl NetworkBehaviour for Mdns {
|
|||||||
{
|
{
|
||||||
*cur_expires = cmp::max(*cur_expires, new_expiration);
|
*cur_expires = cmp::max(*cur_expires, new_expiration);
|
||||||
} else {
|
} else {
|
||||||
self.discovered_nodes.push((peer.id().clone(), addr.clone(), new_expiration));
|
self.discovered_nodes.push((*peer.id(), addr.clone(), new_expiration));
|
||||||
}
|
}
|
||||||
|
|
||||||
discovered.push((peer.id().clone(), addr));
|
discovered.push((*peer.id(), addr));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ pub fn build_query_response(
|
|||||||
let ttl = duration_to_secs(ttl);
|
let ttl = duration_to_secs(ttl);
|
||||||
|
|
||||||
// Add a limit to 2^16-1 addresses, as the protocol limits to this number.
|
// Add a limit to 2^16-1 addresses, as the protocol limits to this number.
|
||||||
let mut addresses = addresses.take(65535);
|
let addresses = addresses.take(65535);
|
||||||
|
|
||||||
let peer_id_bytes = encode_peer_id(&peer_id);
|
let peer_id_bytes = encode_peer_id(&peer_id);
|
||||||
debug_assert!(peer_id_bytes.len() <= 0xffff);
|
debug_assert!(peer_id_bytes.len() <= 0xffff);
|
||||||
@ -127,7 +127,7 @@ pub fn build_query_response(
|
|||||||
|
|
||||||
// Encode the addresses as TXT records, and multiple TXT records into a
|
// Encode the addresses as TXT records, and multiple TXT records into a
|
||||||
// response packet.
|
// response packet.
|
||||||
while let Some(addr) = addresses.next() {
|
for addr in addresses {
|
||||||
let txt_to_send = format!("dnsaddr={}/p2p/{}", addr.to_string(), peer_id.to_base58());
|
let txt_to_send = format!("dnsaddr={}/p2p/{}", addr.to_string(), peer_id.to_base58());
|
||||||
let mut txt_record = Vec::with_capacity(txt_to_send.len());
|
let mut txt_record = Vec::with_capacity(txt_to_send.len());
|
||||||
match append_txt_record(&mut txt_record, &peer_id_bytes, ttl, &txt_to_send) {
|
match append_txt_record(&mut txt_record, &peer_id_bytes, ttl, &txt_to_send) {
|
||||||
@ -203,7 +203,7 @@ pub fn build_service_discovery_response(id: u16, ttl: Duration) -> MdnsPacket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Constructs an MDNS query response packet for an address lookup.
|
/// Constructs an MDNS query response packet for an address lookup.
|
||||||
fn query_response_packet(id: u16, peer_id: &Vec<u8>, records: &Vec<Vec<u8>>, ttl: u32) -> MdnsPacket {
|
fn query_response_packet(id: u16, peer_id: &[u8], records: &[Vec<u8>], ttl: u32) -> MdnsPacket {
|
||||||
let mut out = Vec::with_capacity(records.len() * MAX_TXT_RECORD_SIZE);
|
let mut out = Vec::with_capacity(records.len() * MAX_TXT_RECORD_SIZE);
|
||||||
|
|
||||||
append_u16(&mut out, id);
|
append_u16(&mut out, id);
|
||||||
@ -347,7 +347,7 @@ fn append_character_string(out: &mut Vec<u8>, ascii_str: &str) -> Result<(), Mdn
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Appends a TXT record to `out`.
|
/// Appends a TXT record to `out`.
|
||||||
fn append_txt_record<'a>(
|
fn append_txt_record(
|
||||||
out: &mut Vec<u8>,
|
out: &mut Vec<u8>,
|
||||||
name: &[u8],
|
name: &[u8],
|
||||||
ttl_secs: u32,
|
ttl_secs: u32,
|
||||||
|
@ -331,7 +331,7 @@ impl MdnsPacket {
|
|||||||
from,
|
from,
|
||||||
query_id: packet.header.id,
|
query_id: packet.header.id,
|
||||||
});
|
});
|
||||||
return Some(query);
|
Some(query)
|
||||||
} else if packet
|
} else if packet
|
||||||
.questions
|
.questions
|
||||||
.iter()
|
.iter()
|
||||||
@ -344,21 +344,21 @@ impl MdnsPacket {
|
|||||||
query_id: packet.header.id,
|
query_id: packet.header.id,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
return Some(discovery);
|
Some(discovery)
|
||||||
} else {
|
} else {
|
||||||
return None;
|
None
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let resp = MdnsPacket::Response(MdnsResponse::new (
|
let resp = MdnsPacket::Response(MdnsResponse::new (
|
||||||
packet,
|
packet,
|
||||||
from,
|
from,
|
||||||
));
|
));
|
||||||
return Some(resp);
|
Some(resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("Parsing mdns packet failed: {:?}", err);
|
warn!("Parsing mdns packet failed: {:?}", err);
|
||||||
return None;
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -377,10 +377,10 @@ where
|
|||||||
|
|
||||||
if let Some(request) = self.try_send_request(peer, request) {
|
if let Some(request) = self.try_send_request(peer, request) {
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::DialPeer {
|
self.pending_events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id: peer.clone(),
|
peer_id: *peer,
|
||||||
condition: DialPeerCondition::Disconnected,
|
condition: DialPeerCondition::Disconnected,
|
||||||
});
|
});
|
||||||
self.pending_outbound_requests.entry(peer.clone()).or_default().push(request);
|
self.pending_outbound_requests.entry(*peer).or_default().push(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
request_id
|
request_id
|
||||||
@ -409,7 +409,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Addresses added in this way are only removed by `remove_address`.
|
/// Addresses added in this way are only removed by `remove_address`.
|
||||||
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
|
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
|
||||||
self.addresses.entry(peer.clone()).or_default().push(address);
|
self.addresses.entry(*peer).or_default().push(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes an address of a peer previously added via `add_address`.
|
/// Removes an address of a peer previously added via `add_address`.
|
||||||
@ -479,7 +479,7 @@ where
|
|||||||
let conn = &mut connections[ix];
|
let conn = &mut connections[ix];
|
||||||
conn.pending_inbound_responses.insert(request.request_id);
|
conn.pending_inbound_responses.insert(request.request_id);
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||||
peer_id: peer.clone(),
|
peer_id: *peer,
|
||||||
handler: NotifyHandler::One(conn.id),
|
handler: NotifyHandler::One(conn.id),
|
||||||
event: request
|
event: request
|
||||||
});
|
});
|
||||||
@ -576,7 +576,7 @@ where
|
|||||||
ConnectedPoint::Dialer { address } => Some(address.clone()),
|
ConnectedPoint::Dialer { address } => Some(address.clone()),
|
||||||
ConnectedPoint::Listener { .. } => None
|
ConnectedPoint::Listener { .. } => None
|
||||||
};
|
};
|
||||||
self.connected.entry(peer.clone())
|
self.connected.entry(*peer)
|
||||||
.or_default()
|
.or_default()
|
||||||
.push(Connection::new(*conn, address));
|
.push(Connection::new(*conn, address));
|
||||||
}
|
}
|
||||||
@ -597,7 +597,7 @@ where
|
|||||||
for request_id in connection.pending_outbound_responses {
|
for request_id in connection.pending_outbound_responses {
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
RequestResponseEvent::InboundFailure {
|
RequestResponseEvent::InboundFailure {
|
||||||
peer: peer_id.clone(),
|
peer: *peer_id,
|
||||||
request_id,
|
request_id,
|
||||||
error: InboundFailure::ConnectionClosed
|
error: InboundFailure::ConnectionClosed
|
||||||
}
|
}
|
||||||
@ -608,7 +608,7 @@ where
|
|||||||
for request_id in connection.pending_inbound_responses {
|
for request_id in connection.pending_inbound_responses {
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
RequestResponseEvent::OutboundFailure {
|
RequestResponseEvent::OutboundFailure {
|
||||||
peer: peer_id.clone(),
|
peer: *peer_id,
|
||||||
request_id,
|
request_id,
|
||||||
error: OutboundFailure::ConnectionClosed
|
error: OutboundFailure::ConnectionClosed
|
||||||
}
|
}
|
||||||
@ -631,7 +631,7 @@ where
|
|||||||
for request in pending {
|
for request in pending {
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
RequestResponseEvent::OutboundFailure {
|
RequestResponseEvent::OutboundFailure {
|
||||||
peer: peer.clone(),
|
peer: *peer,
|
||||||
request_id: request.request_id,
|
request_id: request.request_id,
|
||||||
error: OutboundFailure::DialFailure
|
error: OutboundFailure::DialFailure
|
||||||
}
|
}
|
||||||
@ -660,10 +660,10 @@ where
|
|||||||
RequestResponseEvent::Message { peer, message }));
|
RequestResponseEvent::Message { peer, message }));
|
||||||
}
|
}
|
||||||
RequestResponseHandlerEvent::Request { request_id, request, sender } => {
|
RequestResponseHandlerEvent::Request { request_id, request, sender } => {
|
||||||
let channel = ResponseChannel { request_id, peer: peer.clone(), sender };
|
let channel = ResponseChannel { request_id, peer, sender };
|
||||||
let message = RequestResponseMessage::Request { request_id, request, channel };
|
let message = RequestResponseMessage::Request { request_id, request, channel };
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
RequestResponseEvent::Message { peer: peer.clone(), message }
|
RequestResponseEvent::Message { peer, message }
|
||||||
));
|
));
|
||||||
|
|
||||||
match self.get_connection_mut(&peer, connection) {
|
match self.get_connection_mut(&peer, connection) {
|
||||||
@ -675,7 +675,7 @@ where
|
|||||||
None => {
|
None => {
|
||||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||||
RequestResponseEvent::InboundFailure {
|
RequestResponseEvent::InboundFailure {
|
||||||
peer: peer.clone(),
|
peer,
|
||||||
request_id,
|
request_id,
|
||||||
error: InboundFailure::ConnectionClosed
|
error: InboundFailure::ConnectionClosed
|
||||||
}
|
}
|
||||||
|
@ -251,7 +251,7 @@ where
|
|||||||
} else if let Some(info) = self.offline_peer_info.get_mut(p) {
|
} else if let Some(info) = self.offline_peer_info.get_mut(p) {
|
||||||
info.recv_budget.limit.set(limit)
|
info.recv_budget.limit.set(limit)
|
||||||
}
|
}
|
||||||
self.limit_overrides.insert(p.clone(), Limit::new(limit));
|
self.limit_overrides.insert(*p, Limit::new(limit));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove any limit overrides for the given peer.
|
/// Remove any limit overrides for the given peer.
|
||||||
@ -286,7 +286,7 @@ where
|
|||||||
let mut info = PeerInfo::new(limit);
|
let mut info = PeerInfo::new(limit);
|
||||||
info.send_budget.remaining -= 1;
|
info.send_budget.remaining -= 1;
|
||||||
let remaining = info.send_budget.remaining;
|
let remaining = info.send_budget.remaining;
|
||||||
self.offline_peer_info.put(p.clone(), info);
|
self.offline_peer_info.put(*p, info);
|
||||||
remaining
|
remaining
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -428,13 +428,13 @@ where
|
|||||||
if !self.peer_info.contains_key(p) {
|
if !self.peer_info.contains_key(p) {
|
||||||
if let Some(info) = self.offline_peer_info.pop(p) {
|
if let Some(info) = self.offline_peer_info.pop(p) {
|
||||||
let recv_budget = info.recv_budget.remaining;
|
let recv_budget = info.recv_budget.remaining;
|
||||||
self.peer_info.insert(p.clone(), info);
|
self.peer_info.insert(*p, info);
|
||||||
if recv_budget > 1 {
|
if recv_budget > 1 {
|
||||||
self.send_credit(p, recv_budget - 1);
|
self.send_credit(p, recv_budget - 1);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
|
let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
|
||||||
self.peer_info.insert(p.clone(), PeerInfo::new(limit));
|
self.peer_info.insert(*p, PeerInfo::new(limit));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -442,7 +442,7 @@ where
|
|||||||
fn inject_disconnected(&mut self, p: &PeerId) {
|
fn inject_disconnected(&mut self, p: &PeerId) {
|
||||||
log::trace!("{:08x}: disconnected from {}", self.id, p);
|
log::trace!("{:08x}: disconnected from {}", self.id, p);
|
||||||
if let Some(info) = self.peer_info.remove(p) {
|
if let Some(info) = self.peer_info.remove(p) {
|
||||||
self.offline_peer_info.put(p.clone(), info.into_disconnected());
|
self.offline_peer_info.put(*p, info.into_disconnected());
|
||||||
}
|
}
|
||||||
self.behaviour.inject_disconnected(p)
|
self.behaviour.inject_disconnected(p)
|
||||||
}
|
}
|
||||||
@ -528,7 +528,7 @@ where
|
|||||||
if info.send_budget.grant < Some(id) {
|
if info.send_budget.grant < Some(id) {
|
||||||
if info.send_budget.remaining == 0 && credit > 0 {
|
if info.send_budget.remaining == 0 && credit > 0 {
|
||||||
log::trace!("{:08x}: sending to peer {} can resume", self.id, peer);
|
log::trace!("{:08x}: sending to peer {} can resume", self.id, peer);
|
||||||
self.events.push_back(Event::ResumeSending(peer.clone()))
|
self.events.push_back(Event::ResumeSending(peer))
|
||||||
}
|
}
|
||||||
info.send_budget.remaining += credit;
|
info.send_budget.remaining += credit;
|
||||||
info.send_budget.grant = Some(id);
|
info.send_budget.grant = Some(id);
|
||||||
@ -549,7 +549,7 @@ where
|
|||||||
};
|
};
|
||||||
if info.recv_budget.remaining == 0 {
|
if info.recv_budget.remaining == 0 {
|
||||||
log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer);
|
log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer);
|
||||||
self.events.push_back(Event::TooManyInboundRequests(peer.clone()));
|
self.events.push_back(Event::TooManyInboundRequests(peer));
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
info.recv_budget.remaining -= 1;
|
info.recv_budget.remaining -= 1;
|
||||||
|
@ -383,7 +383,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
let handler = me.behaviour.new_handler()
|
let handler = me.behaviour.new_handler()
|
||||||
.into_node_handler_builder()
|
.into_node_handler_builder()
|
||||||
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
|
.with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
|
||||||
me.network.peer(peer_id.clone())
|
me.network.peer(*peer_id)
|
||||||
.dial(first, addrs, handler)
|
.dial(first, addrs, handler)
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
.map_err(DialError::ConnectionLimit)
|
.map_err(DialError::ConnectionLimit)
|
||||||
@ -408,7 +408,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
|
|
||||||
/// Returns the peer ID of the swarm passed as parameter.
|
/// Returns the peer ID of the swarm passed as parameter.
|
||||||
pub fn local_peer_id(me: &Self) -> &PeerId {
|
pub fn local_peer_id(me: &Self) -> &PeerId {
|
||||||
&me.network.local_peer_id()
|
me.network.local_peer_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns an iterator for [`AddressRecord`]s of external addresses
|
/// Returns an iterator for [`AddressRecord`]s of external addresses
|
||||||
@ -451,7 +451,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
/// Any incoming connection and any dialing attempt will immediately be rejected.
|
/// Any incoming connection and any dialing attempt will immediately be rejected.
|
||||||
/// This function has no effect if the peer is already banned.
|
/// This function has no effect if the peer is already banned.
|
||||||
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
|
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
|
||||||
if me.banned_peers.insert(peer_id.clone()) {
|
if me.banned_peers.insert(peer_id) {
|
||||||
if let Some(peer) = me.network.peer(peer_id).into_connected() {
|
if let Some(peer) = me.network.peer(peer_id).into_connected() {
|
||||||
peer.disconnect();
|
peer.disconnect();
|
||||||
}
|
}
|
||||||
@ -504,7 +504,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
match this.network.poll(cx) {
|
match this.network.poll(cx) {
|
||||||
Poll::Pending => network_not_ready = true,
|
Poll::Pending => network_not_ready = true,
|
||||||
Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
|
Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
|
||||||
let peer = connection.peer_id().clone();
|
let peer = connection.peer_id();
|
||||||
let connection = connection.id();
|
let connection = connection.id();
|
||||||
this.behaviour.inject_event(peer, connection, event);
|
this.behaviour.inject_event(peer, connection, event);
|
||||||
},
|
},
|
||||||
@ -514,10 +514,10 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
|
this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
|
||||||
},
|
},
|
||||||
Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
|
Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
|
||||||
let peer_id = connection.peer_id().clone();
|
let peer_id = connection.peer_id();
|
||||||
let endpoint = connection.endpoint().clone();
|
let endpoint = connection.endpoint().clone();
|
||||||
if this.banned_peers.contains(&peer_id) {
|
if this.banned_peers.contains(&peer_id) {
|
||||||
this.network.peer(peer_id.clone())
|
this.network.peer(peer_id)
|
||||||
.into_connected()
|
.into_connected()
|
||||||
.expect("the Network just notified us that we were connected; QED")
|
.expect("the Network just notified us that we were connected; QED")
|
||||||
.disconnect();
|
.disconnect();
|
||||||
@ -645,7 +645,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
// before polling the behaviour again. If the targeted peer
|
// before polling the behaviour again. If the targeted peer
|
||||||
// meanwhie disconnected, the event is discarded.
|
// meanwhie disconnected, the event is discarded.
|
||||||
if let Some((peer_id, handler, event)) = this.pending_event.take() {
|
if let Some((peer_id, handler, event)) = this.pending_event.take() {
|
||||||
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
|
if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
|
||||||
match handler {
|
match handler {
|
||||||
PendingNotifyHandler::One(conn_id) =>
|
PendingNotifyHandler::One(conn_id) =>
|
||||||
if let Some(mut conn) = peer.connection(conn_id) {
|
if let Some(mut conn) = peer.connection(conn_id) {
|
||||||
@ -706,7 +706,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
|
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
|
||||||
peer_id, condition);
|
peer_id, condition);
|
||||||
let self_listening = &this.listened_addrs;
|
let self_listening = &this.listened_addrs;
|
||||||
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() {
|
if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
|
||||||
let addrs = this.behaviour.addresses_of_peer(peer.id());
|
let addrs = this.behaviour.addresses_of_peer(peer.id());
|
||||||
let mut attempt = peer.some_attempt();
|
let mut attempt = peer.some_attempt();
|
||||||
for a in addrs {
|
for a in addrs {
|
||||||
@ -719,7 +719,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
|
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
|
||||||
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
|
if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
|
||||||
match handler {
|
match handler {
|
||||||
NotifyHandler::One(connection) => {
|
NotifyHandler::One(connection) => {
|
||||||
if let Some(mut conn) = peer.connection(connection) {
|
if let Some(mut conn) = peer.connection(connection) {
|
||||||
@ -743,7 +743,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
|
|||||||
},
|
},
|
||||||
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
|
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
|
||||||
for addr in this.network.address_translation(&address) {
|
for addr in this.network.address_translation(&address) {
|
||||||
if this.external_addrs.iter().all(|a| &a.addr != &addr) {
|
if this.external_addrs.iter().all(|a| a.addr != addr) {
|
||||||
this.behaviour.inject_new_external_addr(&addr);
|
this.behaviour.inject_new_external_addr(&addr);
|
||||||
}
|
}
|
||||||
this.external_addrs.add(addr, score);
|
this.external_addrs.add(addr, score);
|
||||||
@ -898,7 +898,7 @@ impl<'a> PollParameters for SwarmPollParameters<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn local_peer_id(&self) -> &PeerId {
|
fn local_peer_id(&self) -> &PeerId {
|
||||||
self.local_peer_id
|
&self.local_peer_id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -925,7 +925,7 @@ where TBehaviour: NetworkBehaviour,
|
|||||||
) -> Self {
|
) -> Self {
|
||||||
SwarmBuilder {
|
SwarmBuilder {
|
||||||
local_peer_id,
|
local_peer_id,
|
||||||
transport: transport,
|
transport,
|
||||||
behaviour,
|
behaviour,
|
||||||
network_config: Default::default(),
|
network_config: Default::default(),
|
||||||
substream_upgrade_protocol_override: None,
|
substream_upgrade_protocol_override: None,
|
||||||
|
@ -106,11 +106,11 @@ where
|
|||||||
|
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||||
let (upgrade, info, timeout, version) = self.handlers.iter()
|
let (upgrade, info, timeout, version) = self.handlers.iter()
|
||||||
.map(|(k, h)| {
|
.map(|(key, handler)| {
|
||||||
let p = h.listen_protocol();
|
let proto = handler.listen_protocol();
|
||||||
let t = *p.timeout();
|
let timeout = *proto.timeout();
|
||||||
let (v, u, i) = p.into_upgrade();
|
let (version, upgrade, info) = proto.into_upgrade();
|
||||||
(k.clone(), (v, u, i, t))
|
(key.clone(), (version, upgrade, info, timeout))
|
||||||
})
|
})
|
||||||
.fold((Upgrade::new(), Info::new(), Duration::from_secs(0), None),
|
.fold((Upgrade::new(), Info::new(), Duration::from_secs(0), None),
|
||||||
|(mut upg, mut inf, mut timeout, mut version), (k, (v, u, i, t))| {
|
|(mut upg, mut inf, mut timeout, mut version), (k, (v, u, i, t))| {
|
||||||
|
@ -245,7 +245,7 @@ where
|
|||||||
match endpoint {
|
match endpoint {
|
||||||
SubstreamEndpoint::Listener => {
|
SubstreamEndpoint::Listener => {
|
||||||
let protocol = self.handler.listen_protocol();
|
let protocol = self.handler.listen_protocol();
|
||||||
let timeout = protocol.timeout().clone();
|
let timeout = *protocol.timeout();
|
||||||
let (_, upgrade, user_data) = protocol.into_upgrade();
|
let (_, upgrade, user_data) = protocol.into_upgrade();
|
||||||
let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade));
|
let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade));
|
||||||
let timeout = Delay::new(timeout);
|
let timeout = Delay::new(timeout);
|
||||||
@ -334,7 +334,7 @@ where
|
|||||||
}
|
}
|
||||||
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
|
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
|
||||||
let id = self.unique_dial_upgrade_id;
|
let id = self.unique_dial_upgrade_id;
|
||||||
let timeout = protocol.timeout().clone();
|
let timeout = *protocol.timeout();
|
||||||
self.unique_dial_upgrade_id += 1;
|
self.unique_dial_upgrade_id += 1;
|
||||||
let (version, upgrade, info) = protocol.into_upgrade();
|
let (version, upgrade, info) = protocol.into_upgrade();
|
||||||
self.queued_dial_upgrades.push((id, (version, SendWrapper(upgrade))));
|
self.queued_dial_upgrades.push((id, (version, SendWrapper(upgrade))));
|
||||||
|
@ -110,7 +110,7 @@ where
|
|||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||||
let proto1 = self.proto1.listen_protocol();
|
let proto1 = self.proto1.listen_protocol();
|
||||||
let proto2 = self.proto2.listen_protocol();
|
let proto2 = self.proto2.listen_protocol();
|
||||||
let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone();
|
let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout());
|
||||||
let (_, u1, i1) = proto1.into_upgrade();
|
let (_, u1, i1) = proto1.into_upgrade();
|
||||||
let (_, u2, i2) = proto2.into_upgrade();
|
let (_, u2, i2) = proto2.into_upgrade();
|
||||||
let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2));
|
let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2));
|
||||||
|
@ -241,7 +241,7 @@ where
|
|||||||
.expect("Can't receive an inbound substream if disabled; QED")
|
.expect("Can't receive an inbound substream if disabled; QED")
|
||||||
.inject_fully_negotiated_inbound(out, info)
|
.inject_fully_negotiated_inbound(out, info)
|
||||||
} else {
|
} else {
|
||||||
panic!("Unpexpected Either::Right in enabled `inject_fully_negotiated_inbound`.")
|
panic!("Unexpected Either::Right in enabled `inject_fully_negotiated_inbound`.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@ use std::{error::Error, fmt, io};
|
|||||||
|
|
||||||
/// libp2p_noise error type.
|
/// libp2p_noise error type.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
#[non_exhaustive]
|
||||||
pub enum NoiseError {
|
pub enum NoiseError {
|
||||||
/// An I/O error has been encountered.
|
/// An I/O error has been encountered.
|
||||||
Io(io::Error),
|
Io(io::Error),
|
||||||
@ -38,8 +39,6 @@ pub enum NoiseError {
|
|||||||
InvalidPayload(prost::DecodeError),
|
InvalidPayload(prost::DecodeError),
|
||||||
/// A signature was required and could not be created.
|
/// A signature was required and could not be created.
|
||||||
SigningError(identity::error::SigningError),
|
SigningError(identity::error::SigningError),
|
||||||
#[doc(hidden)]
|
|
||||||
__Nonexhaustive
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for NoiseError {
|
impl fmt::Display for NoiseError {
|
||||||
@ -51,7 +50,6 @@ impl fmt::Display for NoiseError {
|
|||||||
NoiseError::InvalidPayload(e) => write!(f, "{}", e),
|
NoiseError::InvalidPayload(e) => write!(f, "{}", e),
|
||||||
NoiseError::AuthenticationFailed => f.write_str("Authentication failed"),
|
NoiseError::AuthenticationFailed => f.write_str("Authentication failed"),
|
||||||
NoiseError::SigningError(e) => write!(f, "{}", e),
|
NoiseError::SigningError(e) => write!(f, "{}", e),
|
||||||
NoiseError::__Nonexhaustive => f.write_str("__Nonexhaustive")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -65,7 +63,6 @@ impl Error for NoiseError {
|
|||||||
NoiseError::AuthenticationFailed => None,
|
NoiseError::AuthenticationFailed => None,
|
||||||
NoiseError::InvalidPayload(e) => Some(e),
|
NoiseError::InvalidPayload(e) => Some(e),
|
||||||
NoiseError::SigningError(e) => Some(e),
|
NoiseError::SigningError(e) => Some(e),
|
||||||
NoiseError::__Nonexhaustive => None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,7 +115,7 @@ impl<T: AsyncWrite + Unpin> AsyncWrite for NoiseOutput<T> {
|
|||||||
this.send_offset += n;
|
this.send_offset += n;
|
||||||
trace!("write: buffered {} bytes", this.send_offset);
|
trace!("write: buffered {} bytes", this.send_offset);
|
||||||
|
|
||||||
return Poll::Ready(Ok(n))
|
Poll::Ready(Ok(n))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
@ -27,7 +27,6 @@ use crate::io::NoiseOutput;
|
|||||||
use futures::ready;
|
use futures::ready;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
use snow;
|
|
||||||
use std::{fmt, io, pin::Pin, task::{Context, Poll}};
|
use std::{fmt, io, pin::Pin, task::{Context, Poll}};
|
||||||
|
|
||||||
/// Max. size of a noise message.
|
/// Max. size of a noise message.
|
||||||
@ -261,9 +260,9 @@ where
|
|||||||
WriteState::Ready => {
|
WriteState::Ready => {
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
WriteState::WriteLen { len, mut buf, mut off } => {
|
WriteState::WriteLen { len, buf, mut off } => {
|
||||||
trace!("write: frame len ({}, {:?}, {}/2)", len, buf, off);
|
trace!("write: frame len ({}, {:?}, {}/2)", len, buf, off);
|
||||||
match write_frame_len(&mut this.io, cx, &mut buf, &mut off) {
|
match write_frame_len(&mut this.io, cx, &buf, &mut off) {
|
||||||
Poll::Ready(Ok(true)) => (),
|
Poll::Ready(Ok(true)) => (),
|
||||||
Poll::Ready(Ok(false)) => {
|
Poll::Ready(Ok(false)) => {
|
||||||
trace!("write: eof");
|
trace!("write: eof");
|
||||||
@ -324,12 +323,12 @@ where
|
|||||||
buf: u16::to_be_bytes(n as u16),
|
buf: u16::to_be_bytes(n as u16),
|
||||||
off: 0
|
off: 0
|
||||||
};
|
};
|
||||||
return Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log::error!("encryption error: {:?}", e);
|
log::error!("encryption error: {:?}", e);
|
||||||
this.write_state = WriteState::EncErr;
|
this.write_state = WriteState::EncErr;
|
||||||
return Err(io::ErrorKind::InvalidData.into())
|
Err(io::ErrorKind::InvalidData.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -386,7 +386,7 @@ where
|
|||||||
};
|
};
|
||||||
log::debug!("listening on {}", socket_addr);
|
log::debug!("listening on {}", socket_addr);
|
||||||
self.do_listen(socket_addr)
|
self.do_listen(socket_addr)
|
||||||
.map_err(|e| TransportError::Other(e))
|
.map_err(TransportError::Other)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||||
@ -582,24 +582,20 @@ where
|
|||||||
match ev {
|
match ev {
|
||||||
Ok(IfEvent::Up(inet)) => {
|
Ok(IfEvent::Up(inet)) => {
|
||||||
let ip = inet.addr();
|
let ip = inet.addr();
|
||||||
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
|
if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) {
|
||||||
if addrs.insert(ip) {
|
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
|
||||||
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
|
log::debug!("New listen address: {}", ma);
|
||||||
log::debug!("New listen address: {}", ma);
|
me.port_reuse.register(ip, me.listen_addr.port());
|
||||||
me.port_reuse.register(ip, me.listen_addr.port());
|
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
|
||||||
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(IfEvent::Down(inet)) => {
|
Ok(IfEvent::Down(inet)) => {
|
||||||
let ip = inet.addr();
|
let ip = inet.addr();
|
||||||
if me.listen_addr.is_ipv4() == ip.is_ipv4() {
|
if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) {
|
||||||
if addrs.remove(&ip) {
|
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
|
||||||
let ma = ip_to_multiaddr(ip, me.listen_addr.port());
|
log::debug!("Expired listen address: {}", ma);
|
||||||
log::debug!("Expired listen address: {}", ma);
|
me.port_reuse.unregister(ip, me.listen_addr.port());
|
||||||
me.port_reuse.unregister(ip, me.listen_addr.port());
|
return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
|
||||||
return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
@ -484,11 +484,7 @@ impl Drop for Connection {
|
|||||||
/// Returns true if `err` is an error about an address not being supported.
|
/// Returns true if `err` is an error about an address not being supported.
|
||||||
fn is_not_supported_error(err: &JsValue) -> bool {
|
fn is_not_supported_error(err: &JsValue) -> bool {
|
||||||
if let Some(err) = err.dyn_ref::<js_sys::Error>() {
|
if let Some(err) = err.dyn_ref::<js_sys::Error>() {
|
||||||
if String::from(err.name()) == "NotSupportedError" {
|
err.name() == "NotSupportedError"
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
@ -130,6 +130,7 @@ pub(crate) fn dns_name_ref(name: &str) -> Result<webpki::DNSNameRef<'_>, Error>
|
|||||||
|
|
||||||
/// TLS related errors.
|
/// TLS related errors.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
#[non_exhaustive]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
/// An underlying I/O error.
|
/// An underlying I/O error.
|
||||||
Io(io::Error),
|
Io(io::Error),
|
||||||
@ -137,9 +138,6 @@ pub enum Error {
|
|||||||
Tls(Box<dyn std::error::Error + Send + Sync>),
|
Tls(Box<dyn std::error::Error + Send + Sync>),
|
||||||
/// The DNS name was invalid.
|
/// The DNS name was invalid.
|
||||||
InvalidDnsName(String),
|
InvalidDnsName(String),
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
__Nonexhaustive
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Display for Error {
|
impl fmt::Display for Error {
|
||||||
@ -148,7 +146,6 @@ impl fmt::Display for Error {
|
|||||||
Error::Io(e) => write!(f, "i/o error: {}", e),
|
Error::Io(e) => write!(f, "i/o error: {}", e),
|
||||||
Error::Tls(e) => write!(f, "tls error: {}", e),
|
Error::Tls(e) => write!(f, "tls error: {}", e),
|
||||||
Error::InvalidDnsName(n) => write!(f, "invalid DNS name: {}", n),
|
Error::InvalidDnsName(n) => write!(f, "invalid DNS name: {}", n),
|
||||||
Error::__Nonexhaustive => f.write_str("__Nonexhaustive")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -158,7 +155,7 @@ impl std::error::Error for Error {
|
|||||||
match self {
|
match self {
|
||||||
Error::Io(e) => Some(e),
|
Error::Io(e) => Some(e),
|
||||||
Error::Tls(e) => Some(&**e),
|
Error::Tls(e) => Some(&**e),
|
||||||
Error::InvalidDnsName(_) | Error::__Nonexhaustive => None
|
Error::InvalidDnsName(_) => None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user