From d6f1b9bf5be320f52b439ad69ba1b8b95322469e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 16 Mar 2018 16:39:02 +0100 Subject: [PATCH] Add logging to libp2p-swarm (#149) * Add logging to libp2p-swarm * Fix style --- libp2p-swarm/Cargo.toml | 1 + libp2p-swarm/src/connection_reuse.rs | 18 ++++++++++-- libp2p-swarm/src/lib.rs | 2 ++ libp2p-swarm/src/swarm.rs | 39 ++++++++++++++++++++---- libp2p-swarm/src/transport.rs | 44 ++++++++++++++++++++++++++++ 5 files changed, 96 insertions(+), 8 deletions(-) diff --git a/libp2p-swarm/Cargo.toml b/libp2p-swarm/Cargo.toml index 3b45ef66..50c79fd8 100644 --- a/libp2p-swarm/Cargo.toml +++ b/libp2p-swarm/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Parity Technologies "] [dependencies] bytes = "0.4" fnv = "1.0" +log = "0.4" multiaddr = "0.2.0" multistream-select = { path = "../multistream-select" } futures = { version = "0.1", features = ["use_std"] } diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 92cc1a75..3f27f703 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -48,7 +48,6 @@ use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; use std::io::Error as IoError; -use std::mem; use std::sync::Arc; use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; @@ -157,10 +156,15 @@ where .get(&addr) .map(|c| c.clone()) { + debug!(target: "libp2p-swarm", "ConnectionReuse: Reusing multiplexed connection to {} \ + instead of dialing", addr); let future = connec.outbound().map(|s| (s, addr)); return Ok(Box::new(future) as Box<_>); } + debug!(target: "libp2p-swarm", "ConnectionReuse: No existing connection to {} ; dialing", + addr); + // TODO: handle if we're already in the middle in dialing that same node? // TODO: try dialing again if the existing connection has dropped @@ -255,11 +259,14 @@ where } Ok(Async::NotReady) => {} Ok(Async::Ready(None)) => { + debug!(target: "libp2p-swarm", "ConnectionReuse: listener has been closed"); if self.connections.is_empty() && self.current_upgrades.is_empty() { return Ok(Async::Ready(None)); } } Err(err) => { + debug!(target: "libp2p-swarm", "ConnectionReuse: error while polling \ + listener: {:?}", err); if self.connections.is_empty() && self.current_upgrades.is_empty() { return Err(err); } @@ -288,6 +295,8 @@ where } Err(err) => { // Insert the rest of the pending upgrades, but not the current one. + debug!(target: "libp2p-swarm", "ConnectionReuse: error while upgrading \ + listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } } @@ -310,6 +319,8 @@ where self.connections.push((muxer, next_incoming, client_addr)); } Err(err) => { + debug!(target: "libp2p-swarm", "ConnectionReuse: error while upgrading the \ + multiplexed incoming connection: {:?}", err); // Insert the rest of the pending connections, but not the current one. return Ok(Async::Ready(Some(future::err(err)))); } @@ -370,8 +381,11 @@ where Ok(Async::NotReady) => { lock.next_incoming.push((muxer, future, addr)); } - Err(_) => { + Err(err) => { // In case of error, we just not push back the element, which drops it. + debug!(target: "libp2p-swarm", "ConnectionReuse incoming: one of the \ + multiplexed substreams produced an error: {:?}", + err); } } } diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index ffc80432..79ed8c5f 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -206,6 +206,8 @@ extern crate bytes; extern crate fnv; #[macro_use] extern crate futures; +#[macro_use] +extern crate log; extern crate multistream_select; extern crate parking_lot; extern crate smallvec; diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs index ecc10b57..ab41a0e3 100644 --- a/libp2p-swarm/src/swarm.rs +++ b/libp2p-swarm/src/swarm.rs @@ -137,6 +137,8 @@ where Du: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ Du::Output: Into, { + trace!(target: "libp2p-swarm", "Swarm dialing {}", multiaddr); + match self.transport .clone() .with_upgrade(upgrade) @@ -171,6 +173,8 @@ where Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/ Dfu: IntoFuture + 'static, // TODO: 'static :-/ { + trace!(target: "libp2p-swarm", "Swarm dialing {} with custom handler", multiaddr); + match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) { Ok(dial) => { let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>; @@ -188,6 +192,7 @@ where pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { match self.upgraded.clone().listen_on(multiaddr) { Ok((listener, new_addr)) => { + trace!(target: "libp2p-swarm", "Swarm listening on {}", new_addr); // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. let _ = self.new_listeners.unbounded_send(listener); @@ -250,12 +255,16 @@ where match self.next_incoming.poll() { Ok(Async::Ready(connec)) => { + trace!(target: "libp2p-swarm", "Swarm received new multiplexed \ + incoming connection"); self.next_incoming = self.upgraded.clone().next_incoming(); self.listeners_upgrade.push(connec); } Ok(Async::NotReady) => {} // TODO: may not be the best idea because we're killing the whole server - Err(_err) => { + Err(err) => { + debug!(target: "libp2p-swarm", "Error in multiplexed incoming \ + connection: {:?}", err); self.next_incoming = self.upgraded.clone().next_incoming(); } }; @@ -294,6 +303,8 @@ where let mut listener = self.listeners.swap_remove(n); match listener.poll() { Ok(Async::Ready(Some(upgrade))) => { + trace!(target: "libp2p-swarm", "Swarm received new connection on \ + listener socket"); self.listeners.push(listener); self.listeners_upgrade.push(upgrade); } @@ -301,7 +312,9 @@ where self.listeners.push(listener); } Ok(Async::Ready(None)) => {} - Err(_err) => {} // Ignoring errors + Err(err) => { + warn!(target: "libp2p-swarm", "Error in listener: {:?}", err); + } }; } @@ -309,6 +322,10 @@ where let mut upgrade = self.listeners_upgrade.swap_remove(n); match upgrade.poll() { Ok(Async::Ready((output, client_addr))) => { + debug!( + "Successfully upgraded listened connection with {}", + client_addr + ); self.to_process.push(future::Either::A( handler(output, client_addr).into_future(), )); @@ -316,7 +333,9 @@ where Ok(Async::NotReady) => { self.listeners_upgrade.push(upgrade); } - Err(_err) => {} // Ignoring errors + Err(err) => { + debug!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err); + } } } @@ -324,22 +343,30 @@ where let mut dialer = self.dialers.swap_remove(n); match dialer.poll() { Ok(Async::Ready((output, addr))) => { + trace!("Successfully upgraded dialed connection with {}", addr); self.to_process .push(future::Either::A(handler(output, addr).into_future())); } Ok(Async::NotReady) => { self.dialers.push(dialer); } - Err(_err) => {} // Ignoring errors + Err(err) => { + debug!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err); + } } } for n in (0..self.to_process.len()).rev() { let mut to_process = self.to_process.swap_remove(n); match to_process.poll() { - Ok(Async::Ready(())) => {} + Ok(Async::Ready(())) => { + trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to \ + completion"); + } Ok(Async::NotReady) => self.to_process.push(to_process), - Err(_err) => {} // Ignoring errors + Err(err) => { + debug!(target: "libp2p-swarm", "Error in processing: {:?}", err); + } } } diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 176eab9a..f643cd72 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -931,8 +931,23 @@ where .map_err(|err| IoError::new(IoErrorKind::Other, err)); negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) }) + .then(|negotiated| { + match negotiated { + Ok((_, _, _, ref client_addr)) => { + debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ + upgrade with {}", client_addr) + }, + Err(ref err) => { + debug!(target: "libp2p-swarm", "Error while negotiated protocol \ + upgrade: {:?}", err) + }, + }; + negotiated + }) .and_then(move |(upgrade_id, connection, upgrade, client_addr)| { let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr); + debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", + client_addr); f.map(|v| (v, client_addr)) }); @@ -971,8 +986,23 @@ where .map_err(|err| IoError::new(IoErrorKind::Other, err)); negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) }) + .then(|negotiated| { + match negotiated { + Ok((_, _, _, ref client_addr)) => { + debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ + upgrade with {}", client_addr) + } + Err(ref err) => { + debug!(target: "libp2p-swarm", "Error while negotiated protocol \ + upgrade: {:?}", err) + } + }; + negotiated + }) .and_then(move |(upgrade_id, connection, upgrade, addr)| { let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr); + debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", + addr); upg.map(|u| (u, addr)) }); @@ -1033,8 +1063,22 @@ where .and_then(move |(connection, remote_addr)| { let iter = upgrade.protocol_names() .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); + let remote_addr2 = remote_addr.clone(); multistream_select::listener_select_proto(connection, iter) .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .then(move |negotiated| { + match negotiated { + Ok(_) => { + debug!(target: "libp2p-swarm", "Successfully negotiated \ + protocol upgrade with {}", remote_addr2) + }, + Err(ref err) => { + debug!(target: "libp2p-swarm", "Error while negotiated \ + protocol upgrade: {:?}", err) + }, + }; + negotiated + }) .and_then(move |(upgrade_id, connection)| { let fut = upgrade.upgrade( connection,