Various fixes and improvements (#166)

This commit is contained in:
Pierre Krieger 2018-04-11 17:32:32 +02:00 committed by GitHub
parent 342a9f69d3
commit 2445d9e9ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 263 additions and 95 deletions

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{stream, Future, IntoFuture, Stream};
use futures::{future, stream, Future, IntoFuture, Stream};
use libp2p_peerstore::{PeerAccess, PeerId, Peerstore};
use libp2p_swarm::{MuxedTransport, Transport};
use multiaddr::{AddrComponent, Multiaddr};
@ -91,27 +91,43 @@ where
let listener = listener.map(move |connec| {
let peerstore = peerstore.clone();
let identify_upgrade = identify_upgrade.clone();
let fut = connec
.and_then(move |(connec, client_addr)| {
let fut = connec.and_then(move |(connec, client_addr)| {
for peer_id in peerstore.peers() {
let peer = match peerstore.peer(&peer_id) {
Some(p) => p,
None => continue,
};
if peer.addrs().any(|addr| addr == client_addr) {
debug!(target: "libp2p-identify", "Incoming substream from {} \
identified as {:?}", client_addr,
peer_id);
let ret = (connec, AddrComponent::P2P(peer_id.into_bytes()).into());
return future::Either::A(future::ok(ret));
}
}
debug!(target: "libp2p-identify", "Incoming connection from {}, dialing back \
in order to identify", client_addr);
// Dial the address that connected to us and try upgrade with the
// identify protocol.
identify_upgrade
let future = identify_upgrade
.clone()
.dial(client_addr.clone())
.map_err(|_| {
.map_err(move |_| {
IoError::new(IoErrorKind::Other, "couldn't dial back incoming node")
})
.map(move |id| (id, connec))
})
.into_future()
.and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec)))
.and_then(move |((identify, original_addr), connec)| {
// Compute the "real" address of the node (in the form `/p2p/...`) and add
// it to the peerstore.
// Compute the "real" address of the node (in the form `/p2p/...`) and
// add it to the peerstore.
let real_addr = match identify {
IdentifyOutput::RemoteInfo { info, .. } => process_identify_info(
&info,
&*peerstore.clone(),
original_addr,
original_addr.clone(),
addr_ttl,
)?,
_ => unreachable!(
@ -120,7 +136,16 @@ where
),
};
debug!(target: "libp2p-identify", "Identified {} as {}", original_addr,
real_addr);
Ok((connec, real_addr))
})
.map_err(move |err| {
warn!(target: "libp2p-identify", "Failed to identify incoming {}",
client_addr);
err
});
future::Either::B(future)
});
Box::new(fut) as Box<Future<Item = _, Error = _>>
@ -143,17 +168,45 @@ where
.collect::<Vec<_>>()
.into_iter();
trace!(target: "libp2p-identify", "Try dialing peer ID {:?} ; {} multiaddrs \
loaded from peerstore", peer_id, addrs.len());
let transport = self.transport;
let future = stream::iter_ok(addrs)
// Try to dial each address through the transport.
.filter_map(move |addr| transport.clone().dial(addr).ok())
.filter_map(move |addr| {
match transport.clone().dial(addr) {
Ok(dial) => Some(dial),
Err((_, addr)) => {
warn!(target: "libp2p-identify", "Address {} not supported by \
underlying transport", addr);
None
},
}
})
.and_then(move |dial| dial)
// Pick the first non-failing dial result.
// Pick the first non-failing dial result by filtering out the ones which fail.
.then(|res| Ok(res))
.filter_map(|res| res.ok())
.into_future()
.map_err(|(err, _)| err)
.and_then(|(val, _)| val.ok_or(IoErrorKind::InvalidData.into())) // TODO: wrong error
.and_then(move |(val, _)| {
match val {
Some((connec, inner_addr)) => {
debug!(target: "libp2p-identify", "Successfully dialed peer {:?} \
through {}", peer_id,
inner_addr);
Ok((connec, inner_addr))
},
None => {
debug!(target: "libp2p-identify", "All multiaddresses failed when \
dialing peer {:?}", peer_id);
// TODO: wrong error
Err(IoErrorKind::InvalidData.into())
},
}
})
// Replace the multiaddress with the one of the form `/p2p/...` or `/ipfs/...`.
.map(move |(socket, _inner_client_addr)| (socket, addr));
Ok(Box::new(future) as Box<_>)
@ -165,6 +218,8 @@ where
let transport = self.transport;
let identify_upgrade = transport.clone().with_upgrade(IdentifyProtocolConfig);
trace!(target: "libp2p-identify", "Pass through when dialing {}", addr);
// We dial a first time the node and upgrade it to identify.
let dial = match identify_upgrade.dial(addr) {
Ok(d) => d,
@ -233,22 +288,37 @@ where
let addr_ttl = self.addr_ttl;
let future = self.transport.next_incoming().map(move |incoming| {
let future = incoming
.and_then(move |(connec, client_addr)| {
let peerstore = peerstore.clone();
let future = incoming.and_then(move |(connec, client_addr)| {
for peer_id in peerstore.peers() {
let peer = match peerstore.peer(&peer_id) {
Some(p) => p,
None => continue,
};
if peer.addrs().any(|addr| addr == client_addr) {
debug!(target: "libp2p-identify", "Incoming substream from {} \
identified as {:?}", client_addr,
peer_id);
let ret = (connec, AddrComponent::P2P(peer_id.into_bytes()).into());
return future::Either::A(future::ok(ret));
}
}
// On an incoming connection, dial back the node and upgrade to the identify
// protocol.
identify_upgrade
let future = identify_upgrade
.clone()
.dial(client_addr.clone())
.map_err(|_| {
IoError::new(IoErrorKind::Other, "couldn't dial back incoming node")
})
.map(move |id| (id, connec))
})
.and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec)))
.into_future()
.and_then(move |dial| dial)
.map(move |dial| (dial, connec))
.and_then(move |(identify, connec)| {
// Add the info to the peerstore and compute the "real" address of the node (in
// the form `/p2p/...`).
// Add the info to the peerstore and compute the "real" address of the
// node (in the form `/p2p/...`).
let real_addr = match identify {
(IdentifyOutput::RemoteInfo { info, .. }, old_addr) => {
process_identify_info(&info, &*peerstore, old_addr, addr_ttl)?
@ -261,6 +331,8 @@ where
Ok((connec, real_addr))
});
future::Either::B(future)
});
Box::new(future) as Box<Future<Item = _, Error = _>>
});

View File

@ -15,6 +15,7 @@ libp2p-identify = { path = "../identify" }
libp2p-peerstore = { path = "../peerstore" }
libp2p-ping = { path = "../ping" }
libp2p-swarm = { path = "../swarm" }
log = "0.4"
multiaddr = "0.3"
parking_lot = "0.5.1"
protobuf = "1.4.2"

View File

@ -27,12 +27,13 @@ use fnv::FnvHashMap;
use futures::{self, future, Future};
use futures::sync::oneshot;
use kad_server::{KadServerInterface, KademliaServerConfig, KademliaServerController};
use kbucket::{KBucketsTable, UpdateOutcome};
use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome};
use libp2p_peerstore::{PeerAccess, PeerId, Peerstore};
use libp2p_swarm::{Endpoint, MuxedTransport, SwarmController};
use libp2p_swarm::ConnectionUpgrade;
use multiaddr::Multiaddr;
use parking_lot::Mutex;
use protocol::ConnectionType;
use query;
use std::collections::hash_map::Entry;
use std::fmt;
@ -364,6 +365,15 @@ where
self.kbuckets.my_id()
}
fn peer_info(&self, peer_id: &PeerId) -> (Vec<Multiaddr>, ConnectionType) {
let addrs = self.peer_store
.peer(peer_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>();
(addrs, ConnectionType::Connected) // ConnectionType meh :-/
}
#[inline]
fn kbuckets_update(&self, peer: &PeerId) {
// TODO: is this the right place for this check?
@ -382,7 +392,19 @@ where
#[inline]
fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec<PeerId> {
self.kbuckets.find_closest(addr).collect()
let mut intermediate: Vec<_> = self.kbuckets.find_closest(addr).collect();
let my_id = self.kbuckets.my_id().clone();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&addr) >= my_id.distance_with(&addr))
{
if intermediate[pos] != my_id {
intermediate.insert(pos, my_id);
}
} else {
intermediate.push(my_id);
}
intermediate
}
}

View File

@ -53,6 +53,10 @@ pub trait KadServerInterface: Clone {
/// Returns the peer ID of the local node.
fn local_id(&self) -> &PeerId;
/// Returns known information about the peer. Not atomic/thread-safe in the sense that
/// information can change immediately after being returned and before they are processed.
fn peer_info(&self, _: &PeerId) -> (Vec<Multiaddr>, protocol::ConnectionType);
/// Updates an entry in the K-Buckets. Called whenever that peer sends us a message.
fn kbuckets_update(&self, peer: &PeerId);
@ -408,19 +412,34 @@ where
}
// Builds a `KadMsg` that handles a `FIND_NODE` request received from the remote.
fn handle_find_node_req<I>(interface: &I, _requested_key: &[u8]) -> KadMsg
fn handle_find_node_req<I>(interface: &I, requested_key: &[u8]) -> KadMsg
where
I: ?Sized + KadServerInterface,
{
KadMsg::FindNodeRes {
closer_peers: vec![
protocol::Peer {
node_id: interface.local_id().clone(),
multiaddrs: vec![],
connection_ty: protocol::ConnectionType::Connected,
},
], // TODO: fill the multiaddresses from the peer store
let peer_id = match PeerId::from_bytes(requested_key.to_vec()) {
// TODO: suboptimal
Ok(id) => id,
Err(_) => {
return KadMsg::FindNodeRes {
closer_peers: vec![],
}
}
};
let closer_peers = interface
.kbuckets_find_closest(&peer_id)
.into_iter()
.map(|peer| {
let (multiaddrs, connection_ty) = interface.peer_info(&peer);
protocol::Peer {
node_id: peer,
multiaddrs: multiaddrs,
connection_ty: connection_ty,
}
})
.collect();
KadMsg::FindNodeRes { closer_peers }
}
// Builds a `KadMsg` that handles a `FIND_VALUE` request received from the remote.

View File

@ -72,6 +72,8 @@ extern crate libp2p_identify;
extern crate libp2p_peerstore;
extern crate libp2p_ping;
extern crate libp2p_swarm;
#[macro_use]
extern crate log;
extern crate multiaddr;
extern crate parking_lot;
extern crate protobuf;
@ -82,7 +84,7 @@ extern crate tokio_timer;
extern crate varint;
pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype};
pub use self::high_level::KademliaUpgrade;
pub use self::high_level::{KademliaProcessingFuture, KademliaUpgrade};
mod high_level;
mod kad_server;

View File

@ -142,6 +142,9 @@ fn query<'a, I>(
where
I: QueryInterface + 'a,
{
debug!(target: "libp2p-kad", "Start query for {:?} ; num results = {}", searched_key,
num_results);
// State of the current iterative process.
struct State<'a> {
// If true, we are still in the first step of the algorithm where we try to find the
@ -206,6 +209,10 @@ where
to_contact
};
debug!(target: "libp2p-kad", "New query round ; {} queries in progress ; contacting \
{} new peers", state.current_attempts_fut.len(),
to_contact.len());
// For each node in `to_contact`, start an RPC query and a corresponding entry in the two
// `state.current_attempts_*` fields.
for peer in to_contact {
@ -229,7 +236,7 @@ where
// values back when inside the loop.
let current_attempts_fut = mem::replace(&mut state.current_attempts_fut, Vec::new());
if current_attempts_fut.is_empty() {
// If `current_attempts_fut` is empty, then `select_all` would panic. It attempts
// If `current_attempts_fut` is empty, then `select_all` would panic. It happens
// when we have no additional node to query.
let future = future::ok(future::Loop::Break(state));
return future::Either::A(future);
@ -254,18 +261,23 @@ where
// `message` contains the reason why the current future was woken up.
let closer_peers = match message {
Ok(msg) => msg,
Err(_) => {
Err(err) => {
trace!(target: "libp2p-kad", "RPC query failed for {:?}: {:?}", remote_id, err);
state.failed_to_contact.insert(remote_id);
return Ok(future::Loop::Continue(state));
}
};
// Update `state.result` with the fact that we received a valid message from a node.
// Inserting the node we received a response from into `state.result`.
// The code is non-trivial because `state.result` is ordered by distance and is limited
// by `num_results` elements.
if let Some(insert_pos) = state.result.iter().position(|e| {
e.distance_with(&searched_key) >= remote_id.distance_with(&searched_key)
}) {
if state.result[insert_pos] != remote_id {
if state.result.len() >= num_results {
state.result.pop();
}
state.result.insert(insert_pos, remote_id);
}
} else if state.result.len() < num_results {
@ -282,6 +294,8 @@ where
// the remote.
{
let valid_multiaddrs = peer.multiaddrs.drain(..);
trace!(target: "libp2p-kad", "Adding multiaddresses to {:?}: {:?}",
peer.node_id, valid_multiaddrs);
query_interface2.peer_add_addrs(
&peer.node_id,
valid_multiaddrs,
@ -317,10 +331,10 @@ where
{
// Check that our `Vec::with_capacity` is correct.
debug_assert_eq!(state.result.capacity(), num_results);
Ok(future::Loop::Break(state))
} else {
if !local_nearest_node_updated {
trace!(target: "libp2p-kad", "Loop didn't update closer node ; jumping to step 2");
state.looking_for_closer = false;
}
@ -331,6 +345,10 @@ where
future::Either::B(future)
});
let stream = stream.map(|state| state.result);
let stream = stream.map(|state| {
debug!(target: "libp2p-kad", "Query finished with {} results", state.result.len());
state.result
});
Box::new(stream) as Box<_>
}

View File

@ -156,14 +156,12 @@ where
.get(&addr)
.map(|c| c.clone())
{
debug!(target: "libp2p-swarm", "ConnectionReuse: Reusing multiplexed connection to {} \
instead of dialing", addr);
debug!(target: "libp2p-swarm", "Using existing multiplexed connection to {}", addr);
let future = connec.outbound().map(|s| (s, addr));
return Ok(Box::new(future) as Box<_>);
}
debug!(target: "libp2p-swarm", "ConnectionReuse: No existing connection to {} ; dialing",
addr);
debug!(target: "libp2p-swarm", "No existing connection to {} ; dialing", addr);
// TODO: handle if we're already in the middle in dialing that same node?
// TODO: try dialing again if the existing connection has dropped
@ -171,6 +169,8 @@ where
let dial = match self.inner.dial(addr) {
Ok(l) => l,
Err((inner, addr)) => {
warn!(target: "libp2p-swarm", "Failed to dial {} because the underlying \
transport doesn't support this address", addr);
return Err((
ConnectionReuse {
inner: inner,
@ -259,14 +259,13 @@ where
}
Ok(Async::NotReady) => {}
Ok(Async::Ready(None)) => {
debug!(target: "libp2p-swarm", "ConnectionReuse: listener has been closed");
debug!(target: "libp2p-swarm", "listener has been closed");
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Ok(Async::Ready(None));
}
}
Err(err) => {
debug!(target: "libp2p-swarm", "ConnectionReuse: error while polling \
listener: {:?}", err);
debug!(target: "libp2p-swarm", "error while polling listener: {:?}", err);
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Err(err);
}
@ -295,8 +294,8 @@ where
}
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
debug!(target: "libp2p-swarm", "ConnectionReuse: error while upgrading \
listener connection: {:?}", err);
debug!(target: "libp2p-swarm", "error while upgrading listener connection: \
{:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
}
}
@ -319,7 +318,7 @@ where
self.connections.push((muxer, next_incoming, client_addr));
}
Err(err) => {
debug!(target: "libp2p-swarm", "ConnectionReuse: error while upgrading the \
debug!(target: "libp2p-swarm", "error while upgrading the \
multiplexed incoming connection: {:?}", err);
// Insert the rest of the pending connections, but not the current one.
return Ok(Async::Ready(Some(future::err(err))));
@ -374,6 +373,7 @@ where
Ok(Async::Ready(value)) => {
// A substream is ready ; push back the muxer for the next time this function
// is called, then return.
debug!(target: "libp2p-swarm", "New incoming substream");
let next = muxer.clone().inbound();
lock.next_incoming.push((muxer, next, addr.clone()));
return Ok(Async::Ready(future::ok((value, addr))));

View File

@ -222,7 +222,7 @@ pub mod muxing;
pub mod transport;
pub use self::connection_reuse::ConnectionReuse;
pub use self::multiaddr::Multiaddr;
pub use self::multiaddr::{AddrComponent, Multiaddr};
pub use self::muxing::StreamMuxer;
pub use self::swarm::{swarm, SwarmController, SwarmFuture};
pub use self::transport::{ConnectionUpgrade, OrUpgrade, PlainTextConfig, Transport, UpgradedNode};

View File

@ -255,13 +255,12 @@ where
match self.next_incoming.poll() {
Ok(Async::Ready(connec)) => {
trace!(target: "libp2p-swarm", "Swarm received new multiplexed \
debug!(target: "libp2p-swarm", "Swarm received new multiplexed \
incoming connection");
self.next_incoming = self.upgraded.clone().next_incoming();
self.listeners_upgrade.push(connec);
}
Ok(Async::NotReady) => {}
// TODO: may not be the best idea because we're killing the whole server
Err(err) => {
debug!(target: "libp2p-swarm", "Error in multiplexed incoming \
connection: {:?}", err);
@ -323,7 +322,7 @@ where
match upgrade.poll() {
Ok(Async::Ready((output, client_addr))) => {
debug!(
"Successfully upgraded listened connection with {}",
"Successfully upgraded incoming connection with {}",
client_addr
);
self.to_process.push(future::Either::A(

View File

@ -927,6 +927,7 @@ where
.and_then(move |(connection, client_addr)| {
let iter = upgrade.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
debug!(target: "libp2p-swarm", "Starting protocol negotiation (dialer)");
let negotiated = multistream_select::dialer_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
@ -949,6 +950,14 @@ where
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
client_addr);
f.map(|v| (v, client_addr))
})
.then(|val| {
match val {
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
protocol"),
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"),
}
val
});
Ok(Box::new(future))
@ -982,6 +991,7 @@ where
let iter = upgrade
.protocol_names()
.map::<_, fn(_) -> _>(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
debug!(target: "libp2p-swarm", "Starting protocol negotiation (incoming)");
let negotiated = multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
@ -1000,10 +1010,19 @@ where
negotiated
})
.and_then(move |(upgrade_id, connection, upgrade, addr)| {
let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr);
let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, &addr);
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
addr);
upg.map(|u| (u, addr))
})
.then(|val| {
match val {
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
protocol"),
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated \
protocol"),
}
val
});
Box::new(future) as Box<Future<Item = _, Error = _>>
@ -1064,6 +1083,7 @@ where
let iter = upgrade.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
let remote_addr2 = remote_addr.clone();
debug!(target: "libp2p-swarm", "Starting protocol negotiation (listener)");
multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| {

View File

@ -150,14 +150,21 @@ where
let is_wss = match inner_addr.pop() {
Some(AddrComponent::WS) => false,
Some(AddrComponent::WSS) => true,
_ => return Err((self, original_addr)),
_ => {
trace!(target: "libp2p-websocket", "Ignoring dial attempt for {} because it is \
not a websocket multiaddr", original_addr);
return Err((self, original_addr));
}
};
debug!(target: "libp2p-websocket", "Dialing {} through inner transport", inner_addr);
let inner_dial = match self.transport.dial(inner_addr) {
Ok(d) => d,
Err((transport, _)) => {
Err((transport, old_addr)) => {
warn!(target: "libp2p-websocket", "Failed to dial {} because {} is not supported \
by the underlying transport", original_addr,
old_addr);
return Err((
WsConfig {
transport: transport,
@ -201,7 +208,15 @@ where
let read_write = RwStreamSink::new(framed_data);
Box::new(read_write) as Box<AsyncStream>
})
.map(|c| (c, client_addr))
.map(move |c| {
let mut actual_addr = client_addr;
if is_wss {
actual_addr.append(AddrComponent::WSS);
} else {
actual_addr.append(AddrComponent::WS);
};
(c, actual_addr)
})
});
Ok(Box::new(dial) as Box<_>)