mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-03 21:21:23 +00:00
Add logging for libp2p-ping (#128)
* Add logging for libp2p-ping * Fix concern * Fix concern again
This commit is contained in:
parent
242702a7a9
commit
9faf3fae29
@ -6,6 +6,7 @@ authors = ["pierre <pierre.krieger1708@gmail.com>"]
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.4"
|
bytes = "0.4"
|
||||||
libp2p-swarm = { path = "../libp2p-swarm" }
|
libp2p-swarm = { path = "../libp2p-swarm" }
|
||||||
|
log = "0.4.1"
|
||||||
multiaddr = "0.2.0"
|
multiaddr = "0.2.0"
|
||||||
multistream-select = { path = "../multistream-select" }
|
multistream-select = { path = "../multistream-select" }
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
|
@ -81,6 +81,8 @@
|
|||||||
extern crate bytes;
|
extern crate bytes;
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate libp2p_swarm;
|
extern crate libp2p_swarm;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate log;
|
||||||
extern crate multistream_select;
|
extern crate multistream_select;
|
||||||
extern crate parking_lot;
|
extern crate parking_lot;
|
||||||
extern crate rand;
|
extern crate rand;
|
||||||
@ -92,6 +94,7 @@ use futures::future::{FutureResult, IntoFuture, loop_fn, Loop};
|
|||||||
use futures::sync::{mpsc, oneshot};
|
use futures::sync::{mpsc, oneshot};
|
||||||
use libp2p_swarm::Multiaddr;
|
use libp2p_swarm::Multiaddr;
|
||||||
use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
|
use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
|
||||||
|
use log::Level;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use rand::Rand;
|
use rand::Rand;
|
||||||
use rand::os::OsRng;
|
use rand::os::OsRng;
|
||||||
@ -125,7 +128,7 @@ impl<C> ConnectionUpgrade<C> for Ping
|
|||||||
type Future = FutureResult<Self::Output, IoError>;
|
type Future = FutureResult<Self::Output, IoError>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr)
|
fn upgrade(self, socket: C, _: Self::UpgradeIdentifier, _: Endpoint, remote_addr: &Multiaddr)
|
||||||
-> Self::Future
|
-> Self::Future
|
||||||
{
|
{
|
||||||
// # How does it work?
|
// # How does it work?
|
||||||
@ -156,8 +159,15 @@ impl<C> ConnectionUpgrade<C> for Ping
|
|||||||
let sink_stream = socket.framed(Codec).map(|msg| Message::Received(msg.freeze()));
|
let sink_stream = socket.framed(Codec).map(|msg| Message::Received(msg.freeze()));
|
||||||
let (sink, stream) = sink_stream.split();
|
let (sink, stream) = sink_stream.split();
|
||||||
|
|
||||||
|
let remote_addr = if log_enabled!(target: "libp2p-ping", Level::Debug) {
|
||||||
|
Some(remote_addr.clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| {
|
let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| {
|
||||||
let expected_pongs = expected_pongs.clone();
|
let expected_pongs = expected_pongs.clone();
|
||||||
|
let remote_addr = remote_addr.clone();
|
||||||
|
|
||||||
stream.into_future().map_err(|(err, _)| err).and_then(move |(message, stream)| {
|
stream.into_future().map_err(|(err, _)| err).and_then(move |(message, stream)| {
|
||||||
let mut expected_pongs = expected_pongs.lock();
|
let mut expected_pongs = expected_pongs.lock();
|
||||||
@ -166,6 +176,8 @@ impl<C> ConnectionUpgrade<C> for Ping
|
|||||||
match message {
|
match message {
|
||||||
Message::Ping(payload, finished) => {
|
Message::Ping(payload, finished) => {
|
||||||
// Ping requested by the user through the `Pinger`.
|
// Ping requested by the user through the `Pinger`.
|
||||||
|
debug!(target: "libp2p-ping", "Sending ping to {:?} with payload {:?}",
|
||||||
|
remote_addr.expect("debug log level is enabled"), payload);
|
||||||
expected_pongs.insert(payload.clone(), finished);
|
expected_pongs.insert(payload.clone(), finished);
|
||||||
Box::new(
|
Box::new(
|
||||||
sink.send(payload).map(|sink| Loop::Continue((sink, stream))),
|
sink.send(payload).map(|sink| Loop::Continue((sink, stream))),
|
||||||
@ -177,11 +189,17 @@ impl<C> ConnectionUpgrade<C> for Ping
|
|||||||
// Payload was ours. Signalling future.
|
// Payload was ours. Signalling future.
|
||||||
// Errors can happen if the user closed the receiving end of
|
// Errors can happen if the user closed the receiving end of
|
||||||
// the future, which is fine to ignore.
|
// the future, which is fine to ignore.
|
||||||
|
debug!(target: "libp2p-ping", "Received pong from {:?} \
|
||||||
|
(payload={:?}) ; ping fufilled",
|
||||||
|
remote_addr.expect("debug log level is enabled"), payload);
|
||||||
let _ = fut.send(());
|
let _ = fut.send(());
|
||||||
Box::new(Ok(Loop::Continue((sink, stream))).into_future()) as
|
Box::new(Ok(Loop::Continue((sink, stream))).into_future()) as
|
||||||
Box<Future<Item = _, Error = _>>
|
Box<Future<Item = _, Error = _>>
|
||||||
} else {
|
} else {
|
||||||
// Payload was not ours. Sending it back.
|
// Payload was not ours. Sending it back.
|
||||||
|
debug!(target: "libp2p-ping", "Received ping from {:?} \
|
||||||
|
(payload={:?}) ; sending back",
|
||||||
|
remote_addr.expect("debug log level is enabled"), payload);
|
||||||
Box::new(
|
Box::new(
|
||||||
sink.send(payload).map(|sink| Loop::Continue((sink, stream))),
|
sink.send(payload).map(|sink| Loop::Continue((sink, stream))),
|
||||||
) as Box<Future<Item = _, Error = _>>
|
) as Box<Future<Item = _, Error = _>>
|
||||||
@ -213,6 +231,7 @@ impl Pinger {
|
|||||||
pub fn ping(&mut self) -> Box<Future<Item = (), Error = Box<Error + Send + Sync>>> {
|
pub fn ping(&mut self) -> Box<Future<Item = (), Error = Box<Error + Send + Sync>>> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let payload: [u8; 32] = Rand::rand(&mut self.os_rng);
|
let payload: [u8; 32] = Rand::rand(&mut self.os_rng);
|
||||||
|
debug!(target: "libp2p-ping", "Preparing for ping with payload {:?}", payload);
|
||||||
// Ignore errors if the ponger has been already destroyed. The returned future will never
|
// Ignore errors if the ponger has been already destroyed. The returned future will never
|
||||||
// be signalled.
|
// be signalled.
|
||||||
let fut = self.send.clone().send(Message::Ping(Bytes::from(payload.to_vec()), tx))
|
let fut = self.send.clone().send(Message::Ping(Bytes::from(payload.to_vec()), tx))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user