Add logging to libp2p-swarm (#149)

* Add logging to libp2p-swarm

* Fix style
This commit is contained in:
Pierre Krieger 2018-03-16 16:39:02 +01:00 committed by GitHub
parent a3b1d785e4
commit d6f1b9bf5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 8 deletions

View File

@ -6,6 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
bytes = "0.4"
fnv = "1.0"
log = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = { version = "0.1", features = ["use_std"] }

View File

@ -48,7 +48,6 @@ use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use parking_lot::Mutex;
use std::io::Error as IoError;
use std::mem;
use std::sync::Arc;
use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode};
@ -157,10 +156,15 @@ where
.get(&addr)
.map(|c| c.clone())
{
debug!(target: "libp2p-swarm", "ConnectionReuse: Reusing multiplexed connection to {} \
instead of dialing", addr);
let future = connec.outbound().map(|s| (s, addr));
return Ok(Box::new(future) as Box<_>);
}
debug!(target: "libp2p-swarm", "ConnectionReuse: No existing connection to {} ; dialing",
addr);
// TODO: handle if we're already in the middle in dialing that same node?
// TODO: try dialing again if the existing connection has dropped
@ -255,11 +259,14 @@ where
}
Ok(Async::NotReady) => {}
Ok(Async::Ready(None)) => {
debug!(target: "libp2p-swarm", "ConnectionReuse: listener has been closed");
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Ok(Async::Ready(None));
}
}
Err(err) => {
debug!(target: "libp2p-swarm", "ConnectionReuse: error while polling \
listener: {:?}", err);
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Err(err);
}
@ -288,6 +295,8 @@ where
}
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
debug!(target: "libp2p-swarm", "ConnectionReuse: error while upgrading \
listener connection: {:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
}
}
@ -310,6 +319,8 @@ where
self.connections.push((muxer, next_incoming, client_addr));
}
Err(err) => {
debug!(target: "libp2p-swarm", "ConnectionReuse: error while upgrading the \
multiplexed incoming connection: {:?}", err);
// Insert the rest of the pending connections, but not the current one.
return Ok(Async::Ready(Some(future::err(err))));
}
@ -370,8 +381,11 @@ where
Ok(Async::NotReady) => {
lock.next_incoming.push((muxer, future, addr));
}
Err(_) => {
Err(err) => {
// In case of error, we just not push back the element, which drops it.
debug!(target: "libp2p-swarm", "ConnectionReuse incoming: one of the \
multiplexed substreams produced an error: {:?}",
err);
}
}
}

View File

@ -206,6 +206,8 @@ extern crate bytes;
extern crate fnv;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
extern crate multistream_select;
extern crate parking_lot;
extern crate smallvec;

View File

@ -137,6 +137,8 @@ where
Du: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
Du::Output: Into<C::Output>,
{
trace!(target: "libp2p-swarm", "Swarm dialing {}", multiaddr);
match self.transport
.clone()
.with_upgrade(upgrade)
@ -171,6 +173,8 @@ where
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
trace!(target: "libp2p-swarm", "Swarm dialing {} with custom handler", multiaddr);
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) {
Ok(dial) => {
let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>;
@ -188,6 +192,7 @@ where
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.upgraded.clone().listen_on(multiaddr) {
Ok((listener, new_addr)) => {
trace!(target: "libp2p-swarm", "Swarm listening on {}", new_addr);
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_listeners.unbounded_send(listener);
@ -250,12 +255,16 @@ where
match self.next_incoming.poll() {
Ok(Async::Ready(connec)) => {
trace!(target: "libp2p-swarm", "Swarm received new multiplexed \
incoming connection");
self.next_incoming = self.upgraded.clone().next_incoming();
self.listeners_upgrade.push(connec);
}
Ok(Async::NotReady) => {}
// TODO: may not be the best idea because we're killing the whole server
Err(_err) => {
Err(err) => {
debug!(target: "libp2p-swarm", "Error in multiplexed incoming \
connection: {:?}", err);
self.next_incoming = self.upgraded.clone().next_incoming();
}
};
@ -294,6 +303,8 @@ where
let mut listener = self.listeners.swap_remove(n);
match listener.poll() {
Ok(Async::Ready(Some(upgrade))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on \
listener socket");
self.listeners.push(listener);
self.listeners_upgrade.push(upgrade);
}
@ -301,7 +312,9 @@ where
self.listeners.push(listener);
}
Ok(Async::Ready(None)) => {}
Err(_err) => {} // Ignoring errors
Err(err) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
}
};
}
@ -309,6 +322,10 @@ where
let mut upgrade = self.listeners_upgrade.swap_remove(n);
match upgrade.poll() {
Ok(Async::Ready((output, client_addr))) => {
debug!(
"Successfully upgraded listened connection with {}",
client_addr
);
self.to_process.push(future::Either::A(
handler(output, client_addr).into_future(),
));
@ -316,7 +333,9 @@ where
Ok(Async::NotReady) => {
self.listeners_upgrade.push(upgrade);
}
Err(_err) => {} // Ignoring errors
Err(err) => {
debug!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err);
}
}
}
@ -324,22 +343,30 @@ where
let mut dialer = self.dialers.swap_remove(n);
match dialer.poll() {
Ok(Async::Ready((output, addr))) => {
trace!("Successfully upgraded dialed connection with {}", addr);
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
}
Ok(Async::NotReady) => {
self.dialers.push(dialer);
}
Err(_err) => {} // Ignoring errors
Err(err) => {
debug!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err);
}
}
}
for n in (0..self.to_process.len()).rev() {
let mut to_process = self.to_process.swap_remove(n);
match to_process.poll() {
Ok(Async::Ready(())) => {}
Ok(Async::Ready(())) => {
trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to \
completion");
}
Ok(Async::NotReady) => self.to_process.push(to_process),
Err(_err) => {} // Ignoring errors
Err(err) => {
debug!(target: "libp2p-swarm", "Error in processing: {:?}", err);
}
}
}

View File

@ -931,8 +931,23 @@ where
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
})
.then(|negotiated| {
match negotiated {
Ok((_, _, _, ref client_addr)) => {
debug!(target: "libp2p-swarm", "Successfully negotiated protocol \
upgrade with {}", client_addr)
},
Err(ref err) => {
debug!(target: "libp2p-swarm", "Error while negotiated protocol \
upgrade: {:?}", err)
},
};
negotiated
})
.and_then(move |(upgrade_id, connection, upgrade, client_addr)| {
let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr);
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
client_addr);
f.map(|v| (v, client_addr))
});
@ -971,8 +986,23 @@ where
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
})
.then(|negotiated| {
match negotiated {
Ok((_, _, _, ref client_addr)) => {
debug!(target: "libp2p-swarm", "Successfully negotiated protocol \
upgrade with {}", client_addr)
}
Err(ref err) => {
debug!(target: "libp2p-swarm", "Error while negotiated protocol \
upgrade: {:?}", err)
}
};
negotiated
})
.and_then(move |(upgrade_id, connection, upgrade, addr)| {
let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr);
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
addr);
upg.map(|u| (u, addr))
});
@ -1033,8 +1063,22 @@ where
.and_then(move |(connection, remote_addr)| {
let iter = upgrade.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
let remote_addr2 = remote_addr.clone();
multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| {
match negotiated {
Ok(_) => {
debug!(target: "libp2p-swarm", "Successfully negotiated \
protocol upgrade with {}", remote_addr2)
},
Err(ref err) => {
debug!(target: "libp2p-swarm", "Error while negotiated \
protocol upgrade: {:?}", err)
},
};
negotiated
})
.and_then(move |(upgrade_id, connection)| {
let fut = upgrade.upgrade(
connection,