mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-08-01 01:11:58 +00:00
Fix additional addresses not tried (#483)
* Fix additional addresses not tried * More fixes * Produce a dialing error if unsupported multiaddr
This commit is contained in:
@@ -467,14 +467,20 @@ where
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => {
|
||||
match self
|
||||
.tasks
|
||||
.insert(task_id, TaskKnownState::Connected(peer_id.clone()))
|
||||
{
|
||||
Some(TaskKnownState::Pending { .. }) => (),
|
||||
Some(TaskKnownState::Interrupted) => continue,
|
||||
None | Some(TaskKnownState::Connected(_)) => panic!("Inconsistent state"),
|
||||
};
|
||||
let existing = match self.tasks.get_mut(&task_id) {
|
||||
Some(state) => state,
|
||||
None => panic!("Inconsistent state")
|
||||
};
|
||||
|
||||
match existing {
|
||||
TaskKnownState::Pending { .. } => (),
|
||||
TaskKnownState::Interrupted => continue,
|
||||
TaskKnownState::Connected(_) => panic!("Inconsistent state"),
|
||||
}
|
||||
|
||||
*existing = TaskKnownState::Connected(peer_id.clone());
|
||||
}
|
||||
|
||||
let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender));
|
||||
let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id);
|
||||
|
@@ -19,7 +19,7 @@
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use futures::prelude::*;
|
||||
use futures::{prelude::*, future};
|
||||
use muxing;
|
||||
use nodes::collection::{
|
||||
CollectionEvent, CollectionStream, PeerMut as CollecPeerMut, ReachAttemptId,
|
||||
@@ -27,7 +27,7 @@ use nodes::collection::{
|
||||
use nodes::listeners::{ListenersEvent, ListenersStream};
|
||||
use nodes::node::Substream;
|
||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||
use std::io::Error as IoError;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use void::Void;
|
||||
use {Endpoint, Multiaddr, PeerId, Transport};
|
||||
|
||||
@@ -370,17 +370,19 @@ where
|
||||
// the borrow checker yells at us.
|
||||
|
||||
if self.active_nodes.peer_mut(&peer_id).is_some() {
|
||||
debug_assert!(!self.out_reach_attempts.contains_key(&peer_id));
|
||||
return Peer::Connected(PeerConnected {
|
||||
peer: self
|
||||
.active_nodes
|
||||
.peer_mut(&peer_id)
|
||||
.expect("we checked for Some"),
|
||||
peer_id,
|
||||
connected_multiaddresses: &self.connected_multiaddresses,
|
||||
connected_multiaddresses: &mut self.connected_multiaddresses,
|
||||
});
|
||||
}
|
||||
|
||||
if self.out_reach_attempts.get_mut(&peer_id).is_some() {
|
||||
debug_assert!(!self.connected_multiaddresses.contains_key(&peer_id));
|
||||
return Peer::PendingConnect(PeerPendingConnect {
|
||||
attempt: match self.out_reach_attempts.entry(peer_id.clone()) {
|
||||
Entry::Occupied(e) => e,
|
||||
@@ -390,6 +392,7 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
debug_assert!(!self.connected_multiaddresses.contains_key(&peer_id));
|
||||
Peer::NotConnected(PeerNotConnected {
|
||||
nodes: self,
|
||||
peer_id,
|
||||
@@ -398,7 +401,7 @@ where
|
||||
|
||||
/// Handles a node reached event from the collection.
|
||||
///
|
||||
/// Optionally returns an event to return from the stream.
|
||||
/// Returns an event to return from the stream.
|
||||
///
|
||||
/// > **Note**: The event **must** have been produced by the collection of nodes, otherwise
|
||||
/// > panics will likely happen.
|
||||
@@ -407,7 +410,16 @@ where
|
||||
peer_id: PeerId,
|
||||
reach_id: ReachAttemptId,
|
||||
closed_outbound_substreams: Option<Vec<TUserData>>,
|
||||
) -> Option<SwarmEvent<TTrans, TMuxer, TUserData>> {
|
||||
) -> SwarmEvent<TTrans, TMuxer, TUserData>
|
||||
where
|
||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||
TTrans::Dial: Send + 'static,
|
||||
TTrans::MultiaddrFuture: Send + 'static,
|
||||
TMuxer: Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TUserData: Send + 'static,
|
||||
{
|
||||
// We first start looking in the incoming attempts. While this makes the code less optimal,
|
||||
// it also makes the logic easier.
|
||||
if let Some(in_pos) = self
|
||||
@@ -421,20 +433,21 @@ where
|
||||
let closed_multiaddr = self.connected_multiaddresses.remove(&peer_id);
|
||||
// Cancel any outgoing attempt to this peer.
|
||||
if let Some(attempt) = self.out_reach_attempts.remove(&peer_id) {
|
||||
debug_assert_ne!(attempt.id, reach_id);
|
||||
self.active_nodes
|
||||
.interrupt(attempt.id)
|
||||
.expect("State inconsistency: invalid reach attempt cancel");
|
||||
}
|
||||
|
||||
if let Some(closed_outbound_substreams) = closed_outbound_substreams {
|
||||
return Some(SwarmEvent::Replaced {
|
||||
return SwarmEvent::Replaced {
|
||||
peer_id,
|
||||
endpoint,
|
||||
closed_multiaddr,
|
||||
closed_outbound_substreams,
|
||||
});
|
||||
};
|
||||
} else {
|
||||
return Some(SwarmEvent::Connected { peer_id, endpoint });
|
||||
return SwarmEvent::Connected { peer_id, endpoint };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -457,14 +470,14 @@ where
|
||||
};
|
||||
|
||||
if let Some(closed_outbound_substreams) = closed_outbound_substreams {
|
||||
return Some(SwarmEvent::Replaced {
|
||||
return SwarmEvent::Replaced {
|
||||
peer_id,
|
||||
endpoint,
|
||||
closed_multiaddr,
|
||||
closed_outbound_substreams,
|
||||
});
|
||||
};
|
||||
} else {
|
||||
return Some(SwarmEvent::Connected { peer_id, endpoint });
|
||||
return SwarmEvent::Connected { peer_id, endpoint };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -476,22 +489,37 @@ where
|
||||
.find(|(_, a)| a.id == reach_id)
|
||||
.map(|(p, _)| p.clone());
|
||||
if let Some(wrong_peer_id) = wrong_peer_id {
|
||||
let mut attempt = self.out_reach_attempts.remove(&wrong_peer_id).unwrap();
|
||||
let attempt = self.out_reach_attempts.remove(&wrong_peer_id).unwrap();
|
||||
|
||||
let num_remain = attempt.next_attempts.len();
|
||||
let failed_addr = attempt.cur_attempted.clone();
|
||||
|
||||
let opened_attempts = self.active_nodes.peer_mut(&peer_id)
|
||||
.expect("Inconsistent state ; received NodeReached event for invalid node")
|
||||
.close();
|
||||
debug_assert!(opened_attempts.is_empty());
|
||||
|
||||
if !attempt.next_attempts.is_empty() {
|
||||
let mut attempt = attempt;
|
||||
attempt.cur_attempted = attempt.next_attempts.remove(0);
|
||||
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
|
||||
Ok(fut) => self.active_nodes.add_reach_attempt(fut),
|
||||
Err((_, addr)) => {
|
||||
let msg = format!("unsupported multiaddr {}", addr);
|
||||
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
|
||||
self.active_nodes.add_reach_attempt::<_, future::FutureResult<Multiaddr, IoError>>(fut)
|
||||
},
|
||||
};
|
||||
|
||||
self.out_reach_attempts.insert(peer_id.clone(), attempt);
|
||||
}
|
||||
|
||||
return Some(SwarmEvent::PublicKeyMismatch {
|
||||
return SwarmEvent::PublicKeyMismatch {
|
||||
remain_addrs_attempt: num_remain,
|
||||
expected_peer_id: peer_id,
|
||||
actual_peer_id: wrong_peer_id,
|
||||
multiaddr: failed_addr,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
// We didn't find any entry in neither the outgoing connections not ingoing connections.
|
||||
@@ -508,7 +536,16 @@ where
|
||||
&mut self,
|
||||
reach_id: ReachAttemptId,
|
||||
error: IoError,
|
||||
) -> Option<SwarmEvent<TTrans, TMuxer, TUserData>> {
|
||||
) -> Option<SwarmEvent<TTrans, TMuxer, TUserData>>
|
||||
where
|
||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||
TTrans::Dial: Send + 'static,
|
||||
TTrans::MultiaddrFuture: Send + 'static,
|
||||
TMuxer: Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TUserData: Send + 'static,
|
||||
{
|
||||
// Search for the attempt in `out_reach_attempts`.
|
||||
// TODO: could be more optimal than iterating over everything
|
||||
let out_reach_peer_id = self
|
||||
@@ -523,7 +560,17 @@ where
|
||||
let failed_addr = attempt.cur_attempted.clone();
|
||||
|
||||
if !attempt.next_attempts.is_empty() {
|
||||
let mut attempt = attempt;
|
||||
attempt.cur_attempted = attempt.next_attempts.remove(0);
|
||||
attempt.id = match self.transport().clone().dial(attempt.cur_attempted.clone()) {
|
||||
Ok(fut) => self.active_nodes.add_reach_attempt(fut),
|
||||
Err((_, addr)) => {
|
||||
let msg = format!("unsupported multiaddr {}", addr);
|
||||
let fut = future::err(IoError::new(IoErrorKind::Other, msg));
|
||||
self.active_nodes.add_reach_attempt::<_, future::FutureResult<Multiaddr, IoError>>(fut)
|
||||
},
|
||||
};
|
||||
|
||||
self.out_reach_attempts.insert(peer_id.clone(), attempt);
|
||||
}
|
||||
|
||||
@@ -721,7 +768,7 @@ where
|
||||
{
|
||||
peer: CollecPeerMut<'a, TUserData>,
|
||||
/// Reference to the `connected_multiaddresses` field of the parent.
|
||||
connected_multiaddresses: &'a FnvHashMap<PeerId, Multiaddr>,
|
||||
connected_multiaddresses: &'a mut FnvHashMap<PeerId, Multiaddr>,
|
||||
peer_id: PeerId,
|
||||
}
|
||||
|
||||
@@ -736,12 +783,13 @@ where
|
||||
// TODO: consider returning a `PeerNotConnected` ; however this makes all the borrows things
|
||||
// much more annoying to deal with
|
||||
pub fn close(self) -> Vec<TUserData> {
|
||||
self.connected_multiaddresses.remove(&self.peer_id);
|
||||
self.peer.close()
|
||||
}
|
||||
|
||||
/// Returns the outcome of the future that resolves the multiaddress of the peer.
|
||||
#[inline]
|
||||
pub fn multiaddr(&self) -> Option<&'a Multiaddr> {
|
||||
pub fn multiaddr(&self) -> Option<&Multiaddr> {
|
||||
self.connected_multiaddresses.get(&self.peer_id)
|
||||
}
|
||||
|
||||
@@ -905,6 +953,7 @@ where
|
||||
impl<TTrans, TMuxer, TUserData> Stream for Swarm<TTrans, TMuxer, TUserData>
|
||||
where
|
||||
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
|
||||
TTrans::Dial: Send + 'static,
|
||||
TTrans::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
||||
TTrans::ListenerUpgrade: Send + 'static,
|
||||
TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
|
||||
@@ -954,20 +1003,16 @@ where
|
||||
match self.active_nodes.poll() {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeReached { peer_id, id }))) => {
|
||||
if let Some(event) = self.handle_node_reached(peer_id, id, None) {
|
||||
return Ok(Async::Ready(Some(event)));
|
||||
}
|
||||
let event = self.handle_node_reached(peer_id, id, None);
|
||||
return Ok(Async::Ready(Some(event)));
|
||||
}
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
||||
peer_id,
|
||||
id,
|
||||
closed_outbound_substreams,
|
||||
}))) => {
|
||||
if let Some(event) =
|
||||
self.handle_node_reached(peer_id, id, Some(closed_outbound_substreams))
|
||||
{
|
||||
return Ok(Async::Ready(Some(event)));
|
||||
}
|
||||
let event = self.handle_node_reached(peer_id, id, Some(closed_outbound_substreams));
|
||||
return Ok(Async::Ready(Some(event)));
|
||||
}
|
||||
Ok(Async::Ready(Some(CollectionEvent::ReachError { id, error }))) => {
|
||||
if let Some(event) = self.handle_reach_error(id, error) {
|
||||
|
Reference in New Issue
Block a user