Add log statements to libp2p-mplex. (#179)

* Add log statements to libp2p-mplex.

Addresses issue #126.

* Run `cargo fmt`.

* Wording.

* Adjust log levels.
This commit is contained in:
Toralf Wittner
2018-05-08 19:20:57 +02:00
committed by Pierre Krieger
parent fd0d75f61e
commit 773cf6c26d
5 changed files with 68 additions and 21 deletions

View File

@ -4,19 +4,20 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
bytes = "0.4.5"
num-traits = "0.1.40"
num-bigint = { version = "0.1.40", default-features = false }
tokio-io = "0.1"
futures = "0.1"
parking_lot = "0.4.8"
arrayvec = "0.4.6" arrayvec = "0.4.6"
rand = "0.3.17" bytes = "0.4.5"
circular-buffer = { path = "../circular-buffer" } circular-buffer = { path = "../circular-buffer" }
libp2p-swarm = { path = "../swarm" }
varint = { path = "../varint-rs" }
error-chain = "0.11.0" error-chain = "0.11.0"
futures = "0.1"
futures-mutex = { git = "https://github.com/paritytech/futures-mutex" } futures-mutex = { git = "https://github.com/paritytech/futures-mutex" }
libp2p-swarm = { path = "../swarm" }
log = "0.4"
num-bigint = { version = "0.1.40", default-features = false }
num-traits = "0.1.40"
parking_lot = "0.4.8"
rand = "0.3.17"
tokio-io = "0.1"
varint = { path = "../varint-rs" }
[dev-dependencies] [dev-dependencies]
libp2p-tcp-transport = { path = "../tcp-transport" } libp2p-tcp-transport = { path = "../tcp-transport" }

View File

@ -26,6 +26,8 @@ extern crate error_chain;
extern crate futures; extern crate futures;
extern crate futures_mutex; extern crate futures_mutex;
extern crate libp2p_swarm as swarm; extern crate libp2p_swarm as swarm;
#[macro_use]
extern crate log;
extern crate num_bigint; extern crate num_bigint;
extern crate num_traits; extern crate num_traits;
extern crate parking_lot; extern crate parking_lot;

View File

@ -30,7 +30,17 @@ use circular_buffer::Array;
pub enum NextMultiplexState { pub enum NextMultiplexState {
NewStream(u32), NewStream(u32),
ParsingMessageBody(u32), ParsingMessageBody(u32),
Ignore, Ignore(u32),
}
impl NextMultiplexState {
pub fn substream_id(&self) -> u32 {
match *self {
NextMultiplexState::NewStream(id)
| NextMultiplexState::ParsingMessageBody(id)
| NextMultiplexState::Ignore(id) => id,
}
}
} }
pub enum MultiplexReadState { pub enum MultiplexReadState {
@ -51,6 +61,7 @@ pub enum MultiplexReadState {
remaining_bytes: usize, remaining_bytes: usize,
}, },
Ignore { Ignore {
substream_id: u32,
remaining_bytes: usize, remaining_bytes: usize,
}, },
} }
@ -231,6 +242,7 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
substream_id, substream_id,
packet_type, packet_type,
} = MultiplexHeader::parse(header).map_err(|err| { } = MultiplexHeader::parse(header).map_err(|err| {
debug!(target: "libp2p-mplex", "failed to parse header: {}", err);
io::Error::new( io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
format!("Error parsing header: {:?}", err), format!("Error parsing header: {:?}", err),
@ -254,7 +266,7 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
PacketType::Close(_) | PacketType::Reset(_) => { PacketType::Close(_) | PacketType::Reset(_) => {
lock.read_state = Some(BodyLength { lock.read_state = Some(BodyLength {
state: Default::default(), state: Default::default(),
next: NextMultiplexState::Ignore, next: NextMultiplexState::Ignore(substream_id),
}); });
lock.close_stream(substream_id); lock.close_stream(substream_id);
@ -269,8 +281,10 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
} }
Err(error) => { Err(error) => {
return if let varint::Error(varint::ErrorKind::Io(inner), ..) = error { return if let varint::Error(varint::ErrorKind::Io(inner), ..) = error {
debug!(target: "libp2p-mplex", "failed to read header: {}", inner);
Err(inner) Err(inner)
} else { } else {
debug!(target: "libp2p-mplex", "failed to read header: {}", error);
Err(io::Error::new(io::ErrorKind::Other, error.description())) Err(io::Error::new(io::ErrorKind::Other, error.description()))
}; };
} }
@ -282,10 +296,15 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
} => { } => {
use self::NextMultiplexState::*; use self::NextMultiplexState::*;
match varint_state let body_len = varint_state.read(&mut lock.stream).map_err(|e| {
.read(&mut lock.stream) debug!(target: "libp2p-mplex",
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Error reading varint"))? "substream {}: failed to read body length: {}",
{ next.substream_id(),
e);
io::Error::new(io::ErrorKind::Other, "Error reading varint")
})?;
match body_len {
Async::Ready(length) => { Async::Ready(length) => {
// TODO: Limit `length` to prevent resource-exhaustion DOS // TODO: Limit `length` to prevent resource-exhaustion DOS
let length = if let Some(length) = length { let length = if let Some(length) = length {
@ -295,7 +314,8 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
}; };
lock.read_state = match next { lock.read_state = match next {
Ignore => Some(MultiplexReadState::Ignore { Ignore(substream_id) => Some(MultiplexReadState::Ignore {
substream_id,
remaining_bytes: length, remaining_bytes: length,
}), }),
NewStream(substream_id) => { NewStream(substream_id) => {
@ -325,6 +345,7 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
}) })
} else { } else {
Some(MultiplexReadState::Ignore { Some(MultiplexReadState::Ignore {
substream_id,
remaining_bytes: length, remaining_bytes: length,
}) })
} }
@ -374,12 +395,15 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
return on_block; return on_block;
} }
Err(other) => { Err(other) => {
debug!(target: "libp2p-mplex",
"substream {}: failed to read new stream: {}",
substream_id,
other);
lock.read_state = Some(NewStream { lock.read_state = Some(NewStream {
substream_id, substream_id,
name, name,
remaining_bytes, remaining_bytes,
}); });
return Err(other); return Err(other);
} }
} }
@ -435,6 +459,10 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
return on_block; return on_block;
} }
Err(other) => { Err(other) => {
debug!(target: "libp2p-mplex",
"substream {}: failed to read message body: {}",
substream_id,
other);
return Err(other); return Err(other);
} }
} }
@ -452,6 +480,7 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
} }
} }
Ignore { Ignore {
substream_id,
mut remaining_bytes, mut remaining_bytes,
} => { } => {
let mut ignore_buf: [u8; 256] = [0; 256]; let mut ignore_buf: [u8; 256] = [0; 256];
@ -466,17 +495,26 @@ fn read_stream_internal<T: AsyncRead, Buf: Array<Item = u8>>(
Ok(consumed) => { Ok(consumed) => {
remaining_bytes -= consumed; remaining_bytes -= consumed;
lock.read_state = Some(Ignore { lock.read_state = Some(Ignore {
substream_id,
remaining_bytes: remaining_bytes, remaining_bytes: remaining_bytes,
}); });
} }
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
lock.read_state = Some(Ignore { remaining_bytes }); lock.read_state = Some(Ignore {
substream_id,
remaining_bytes,
});
return on_block; return on_block;
} }
Err(other) => { Err(other) => {
lock.read_state = Some(Ignore { remaining_bytes }); debug!(target: "libp2p-mplex",
"substream {}: failed to read ignore bytes: {}",
substream_id,
other);
lock.read_state = Some(Ignore {
substream_id,
remaining_bytes,
});
return Err(other); return Err(other);
} }
} }

View File

@ -97,6 +97,7 @@ impl<T, Buf: Array> MultiplexShared<T, Buf> {
} }
pub fn open_stream(&mut self, id: u32) -> bool { pub fn open_stream(&mut self, id: u32) -> bool {
trace!(target: "libp2p-mplex", "open stream {}", id);
self.open_streams self.open_streams
.entry(id) .entry(id)
.or_insert(SubstreamMetadata::new_open()) .or_insert(SubstreamMetadata::new_open())
@ -104,6 +105,7 @@ impl<T, Buf: Array> MultiplexShared<T, Buf> {
} }
pub fn close_stream(&mut self, id: u32) { pub fn close_stream(&mut self, id: u32) {
trace!(target: "libp2p-mplex", "close stream {}", id);
self.open_streams.insert(id, SubstreamMetadata::Closed); self.open_streams.insert(id, SubstreamMetadata::Closed);
} }
} }

View File

@ -207,6 +207,10 @@ pub fn write_stream<Buf: circular_buffer::Array, T: AsyncWrite>(
Err(Some(Body { size })) Err(Some(Body { size }))
} }
Err(other) => { Err(other) => {
debug!(target: "libp2p-mplex",
"substream {}: failed to write body: {}",
id,
other);
return Err(other); return Err(other);
} }
} }