mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-18 04:21:22 +00:00
Fix several errors reported by clippy. (#715)
This commit is contained in:
@ -634,75 +634,71 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Poll the existing nodes.
|
// Poll the existing nodes.
|
||||||
loop {
|
let (action, out_event);
|
||||||
let (action, out_event);
|
match self.active_nodes.poll() {
|
||||||
match self.active_nodes.poll() {
|
Async::NotReady => return Async::NotReady,
|
||||||
Async::NotReady => break,
|
Async::Ready(CollectionEvent::NodeReached(reach_event)) => {
|
||||||
Async::Ready(CollectionEvent::NodeReached(reach_event)) => {
|
let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event);
|
||||||
let (a, e) = handle_node_reached(&mut self.reach_attempts, reach_event);
|
action = a;
|
||||||
action = a;
|
out_event = e;
|
||||||
out_event = e;
|
}
|
||||||
}
|
Async::Ready(CollectionEvent::ReachError { id, error, handler }) => {
|
||||||
Async::Ready(CollectionEvent::ReachError { id, error, handler }) => {
|
let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error, handler);
|
||||||
let (a, e) = handle_reach_error(&mut self.reach_attempts, id, error, handler);
|
action = a;
|
||||||
action = a;
|
out_event = e;
|
||||||
out_event = e;
|
}
|
||||||
}
|
Async::Ready(CollectionEvent::NodeError {
|
||||||
Async::Ready(CollectionEvent::NodeError {
|
peer_id,
|
||||||
|
error,
|
||||||
|
}) => {
|
||||||
|
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
||||||
|
.expect("We insert into connected_points whenever a connection is \
|
||||||
|
opened and remove only when a connection is closed; the \
|
||||||
|
underlying API is guaranteed to always deliver a connection \
|
||||||
|
closed message after it has been opened, and no two closed \
|
||||||
|
messages; QED");
|
||||||
|
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
||||||
|
action = Default::default();
|
||||||
|
out_event = RawSwarmEvent::NodeError {
|
||||||
peer_id,
|
peer_id,
|
||||||
|
endpoint,
|
||||||
error,
|
error,
|
||||||
}) => {
|
};
|
||||||
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
|
||||||
.expect("We insert into connected_points whenever a connection is \
|
|
||||||
opened and remove only when a connection is closed; the \
|
|
||||||
underlying API is guaranteed to always deliver a connection \
|
|
||||||
closed message after it has been opened, and no two closed \
|
|
||||||
messages; QED");
|
|
||||||
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
|
||||||
action = Default::default();
|
|
||||||
out_event = RawSwarmEvent::NodeError {
|
|
||||||
peer_id,
|
|
||||||
endpoint,
|
|
||||||
error,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
|
|
||||||
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
|
||||||
.expect("We insert into connected_points whenever a connection is \
|
|
||||||
opened and remove only when a connection is closed; the \
|
|
||||||
underlying API is guaranteed to always deliver a connection \
|
|
||||||
closed message after it has been opened, and no two closed \
|
|
||||||
messages; QED");
|
|
||||||
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
|
||||||
action = Default::default();
|
|
||||||
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
|
|
||||||
}
|
|
||||||
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => {
|
|
||||||
action = Default::default();
|
|
||||||
out_event = RawSwarmEvent::NodeEvent { peer_id, event };
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some((peer_id, handler, first, rest)) = action.start_dial_out {
|
|
||||||
self.start_dial_out(peer_id, handler, first, rest);
|
|
||||||
}
|
}
|
||||||
|
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
|
||||||
if let Some(interrupt) = action.interrupt {
|
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
||||||
// TODO: improve proof or remove; this is too complicated right now
|
.expect("We insert into connected_points whenever a connection is \
|
||||||
self.active_nodes
|
opened and remove only when a connection is closed; the \
|
||||||
.interrupt(interrupt)
|
underlying API is guaranteed to always deliver a connection \
|
||||||
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
|
closed message after it has been opened, and no two closed \
|
||||||
we insert in out_reach_attempts only when we call \
|
messages; QED");
|
||||||
active_nodes.add_reach_attempt, and we remove only when we call \
|
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
||||||
interrupt or when a reach attempt succeeds or errors; therefore the \
|
action = Default::default();
|
||||||
out_reach_attempts should always be in sync with the actual \
|
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
|
||||||
attempts; QED");
|
}
|
||||||
|
Async::Ready(CollectionEvent::NodeEvent { peer_id, event }) => {
|
||||||
|
action = Default::default();
|
||||||
|
out_event = RawSwarmEvent::NodeEvent { peer_id, event };
|
||||||
}
|
}
|
||||||
|
|
||||||
return Async::Ready(out_event);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Async::NotReady
|
if let Some((peer_id, handler, first, rest)) = action.start_dial_out {
|
||||||
|
self.start_dial_out(peer_id, handler, first, rest);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(interrupt) = action.interrupt {
|
||||||
|
// TODO: improve proof or remove; this is too complicated right now
|
||||||
|
self.active_nodes
|
||||||
|
.interrupt(interrupt)
|
||||||
|
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
|
||||||
|
we insert in out_reach_attempts only when we call \
|
||||||
|
active_nodes.add_reach_attempt, and we remove only when we call \
|
||||||
|
interrupt or when a reach attempt succeeds or errors; therefore the \
|
||||||
|
out_reach_attempts should always be in sync with the actual \
|
||||||
|
attempts; QED");
|
||||||
|
}
|
||||||
|
|
||||||
|
Async::Ready(out_event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,13 +171,10 @@ where
|
|||||||
mut substream: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
|
mut substream: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
|
||||||
_info: Self::OutboundOpenInfo
|
_info: Self::OutboundOpenInfo
|
||||||
) {
|
) {
|
||||||
match mem::replace(&mut self.out_state, OutState::Poisoned) {
|
if let OutState::Upgrading { expires } = mem::replace(&mut self.out_state, OutState::Poisoned) {
|
||||||
OutState::Upgrading { expires } => {
|
// We always upgrade with the intent of immediately pinging.
|
||||||
// We always upgrade with the intent of immediately pinging.
|
substream.ping(Instant::now());
|
||||||
substream.ping(Instant::now());
|
self.out_state = OutState::WaitingForPong { substream, expires }
|
||||||
self.out_state = OutState::WaitingForPong { substream, expires };
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,100 +229,92 @@ where
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
match mem::replace(&mut self.out_state, OutState::Poisoned) {
|
||||||
match mem::replace(&mut self.out_state, OutState::Poisoned) {
|
OutState::Shutdown | OutState::Poisoned => {
|
||||||
OutState::Shutdown | OutState::Poisoned => {
|
// This shuts down the whole connection with the remote.
|
||||||
// This shuts down the whole connection with the remote.
|
Ok(Async::Ready(None))
|
||||||
return Ok(Async::Ready(None));
|
},
|
||||||
},
|
|
||||||
|
|
||||||
OutState::Disabled => {
|
OutState::Disabled => {
|
||||||
return Ok(Async::NotReady);
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to open an outgoing substream.
|
||||||
|
OutState::NeedToOpen { expires } => {
|
||||||
|
// Note that we ignore the expiration here, as it's pretty unlikely to happen.
|
||||||
|
// The expiration is only here to be transmitted to the `Upgrading`.
|
||||||
|
self.out_state = OutState::Upgrading { expires };
|
||||||
|
Ok(Async::Ready(Some(
|
||||||
|
ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||||
|
upgrade: self.ping_config,
|
||||||
|
info: (),
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Waiting for the upgrade to be negotiated.
|
||||||
|
OutState::Upgrading { mut expires } => poll_delay!(expires => {
|
||||||
|
NotReady => {
|
||||||
|
self.out_state = OutState::Upgrading { expires };
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
},
|
||||||
|
Ready => {
|
||||||
|
self.out_state = OutState::Shutdown;
|
||||||
|
let ev = OutEvent::Unresponsive;
|
||||||
|
Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev))))
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
|
||||||
|
// Waiting for the pong.
|
||||||
|
OutState::WaitingForPong { mut substream, mut expires } => {
|
||||||
|
// We start by dialing the substream, leaving one last chance for it to
|
||||||
|
// produce the pong even if the expiration happened.
|
||||||
|
match substream.poll()? {
|
||||||
|
Async::Ready(Some(started)) => {
|
||||||
|
self.out_state = OutState::Idle {
|
||||||
|
substream,
|
||||||
|
next_ping: Delay::new(Instant::now() + self.delay_to_next_ping),
|
||||||
|
};
|
||||||
|
let ev = OutEvent::PingSuccess(started.elapsed());
|
||||||
|
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev))));
|
||||||
|
}
|
||||||
|
Async::NotReady => {}
|
||||||
|
Async::Ready(None) => {
|
||||||
|
self.out_state = OutState::Shutdown;
|
||||||
|
return Ok(Async::Ready(None));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Need to open an outgoing substream.
|
// Check the expiration.
|
||||||
OutState::NeedToOpen { expires } => {
|
poll_delay!(expires => {
|
||||||
// Note that we ignore the expiration here, as it's pretty unlikely to happen.
|
NotReady => {
|
||||||
// The expiration is only here to be transmitted to the `Upgrading`.
|
self.out_state = OutState::WaitingForPong { substream, expires };
|
||||||
self.out_state = OutState::Upgrading { expires };
|
// Both `substream` and `expires` and not ready, so it's fine to return
|
||||||
return Ok(Async::Ready(Some(
|
// not ready.
|
||||||
ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
Ok(Async::NotReady)
|
||||||
upgrade: self.ping_config,
|
},
|
||||||
info: (),
|
Ready => {
|
||||||
},
|
self.out_state = OutState::Shutdown;
|
||||||
)));
|
let ev = OutEvent::Unresponsive;
|
||||||
}
|
Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev))))
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Waiting for the upgrade to be negotiated.
|
OutState::Idle { mut substream, mut next_ping } => {
|
||||||
OutState::Upgrading { mut expires } => poll_delay!(expires => {
|
// Poll the future that fires when we need to ping the node again.
|
||||||
NotReady => {
|
poll_delay!(next_ping => {
|
||||||
self.out_state = OutState::Upgrading { expires };
|
NotReady => {
|
||||||
return Ok(Async::NotReady);
|
self.out_state = OutState::Idle { substream, next_ping };
|
||||||
},
|
Ok(Async::NotReady)
|
||||||
Ready => {
|
},
|
||||||
self.out_state = OutState::Shutdown;
|
Ready => {
|
||||||
let ev = OutEvent::Unresponsive;
|
let expires = Delay::new(Instant::now() + self.ping_timeout);
|
||||||
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev))));
|
substream.ping(Instant::now());
|
||||||
},
|
self.out_state = OutState::WaitingForPong { substream, expires };
|
||||||
}),
|
Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(OutEvent::PingStart))))
|
||||||
|
},
|
||||||
// Waiting for the pong.
|
})
|
||||||
OutState::WaitingForPong {
|
|
||||||
mut substream,
|
|
||||||
mut expires,
|
|
||||||
} => {
|
|
||||||
// We start by dialing the substream, leaving one last chance for it to
|
|
||||||
// produce the pong even if the expiration happened.
|
|
||||||
match substream.poll()? {
|
|
||||||
Async::Ready(Some(started)) => {
|
|
||||||
self.out_state = OutState::Idle {
|
|
||||||
substream,
|
|
||||||
next_ping: Delay::new(Instant::now() + self.delay_to_next_ping),
|
|
||||||
};
|
|
||||||
let ev = OutEvent::PingSuccess(started.elapsed());
|
|
||||||
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev))));
|
|
||||||
}
|
|
||||||
Async::NotReady => {}
|
|
||||||
Async::Ready(None) => {
|
|
||||||
self.out_state = OutState::Shutdown;
|
|
||||||
return Ok(Async::Ready(None));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Check the expiration.
|
|
||||||
poll_delay!(expires => {
|
|
||||||
NotReady => {
|
|
||||||
self.out_state = OutState::WaitingForPong { substream, expires };
|
|
||||||
// Both `substream` and `expires` and not ready, so it's fine to return
|
|
||||||
// not ready.
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
},
|
|
||||||
Ready => {
|
|
||||||
self.out_state = OutState::Shutdown;
|
|
||||||
let ev = OutEvent::Unresponsive;
|
|
||||||
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev))));
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
OutState::Idle {
|
|
||||||
mut substream,
|
|
||||||
mut next_ping,
|
|
||||||
} => {
|
|
||||||
// Poll the future that fires when we need to ping the node again.
|
|
||||||
poll_delay!(next_ping => {
|
|
||||||
NotReady => {
|
|
||||||
self.out_state = OutState::Idle { substream, next_ping };
|
|
||||||
return Ok(Async::NotReady);
|
|
||||||
},
|
|
||||||
Ready => {
|
|
||||||
let expires = Delay::new(Instant::now() + self.ping_timeout);
|
|
||||||
substream.ping(Instant::now());
|
|
||||||
self.out_state = OutState::WaitingForPong { substream, expires };
|
|
||||||
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(OutEvent::PingStart))));
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,6 @@ pub trait PeerAccess {
|
|||||||
/// Sets the TTL of an address of a peer. Adds the address if it is currently unknown.
|
/// Sets the TTL of an address of a peer. Adds the address if it is currently unknown.
|
||||||
///
|
///
|
||||||
/// Contrary to `add_addr`, this operation is never a no-op.
|
/// Contrary to `add_addr`, this operation is never a no-op.
|
||||||
#[inline]
|
|
||||||
fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL);
|
fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL);
|
||||||
|
|
||||||
// Similar to calling `set_addr_ttl` multiple times in a row.
|
// Similar to calling `set_addr_ttl` multiple times in a row.
|
||||||
|
Reference in New Issue
Block a user