mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-30 18:21:33 +00:00
Add some logging when disconnecting a TCP stream (#352)
* Add some logging when disconnecting a TCP stream * More logging
This commit is contained in:
@ -9,8 +9,8 @@ libp2p-core = { path = "../core" }
|
|||||||
log = "0.4.1"
|
log = "0.4.1"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
multiaddr = { path = "../multiaddr" }
|
multiaddr = { path = "../multiaddr" }
|
||||||
|
tokio-io = "0.1"
|
||||||
tokio-tcp = "0.1"
|
tokio-tcp = "0.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio-current-thread = "0.1"
|
tokio-current-thread = "0.1"
|
||||||
tokio-io = "0.1"
|
|
||||||
|
@ -46,21 +46,22 @@ extern crate libp2p_core as swarm;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
extern crate multiaddr;
|
extern crate multiaddr;
|
||||||
|
extern crate tokio_io;
|
||||||
extern crate tokio_tcp;
|
extern crate tokio_tcp;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
extern crate tokio_current_thread;
|
extern crate tokio_current_thread;
|
||||||
#[cfg(test)]
|
|
||||||
extern crate tokio_io;
|
|
||||||
|
|
||||||
|
use futures::Poll;
|
||||||
use futures::future::{self, Future, FutureResult};
|
use futures::future::{self, Future, FutureResult};
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use multiaddr::{AddrComponent, Multiaddr, ToMultiaddr};
|
use multiaddr::{AddrComponent, Multiaddr, ToMultiaddr};
|
||||||
use std::io::Error as IoError;
|
use std::io::{Error as IoError, Read, Write};
|
||||||
use std::iter;
|
use std::iter;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use swarm::Transport;
|
use swarm::Transport;
|
||||||
use tokio_tcp::{TcpListener, TcpStream};
|
use tokio_tcp::{TcpListener, TcpStream};
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
/// Represents the configuration for a TCP/IP transport capability for libp2p.
|
/// Represents the configuration for a TCP/IP transport capability for libp2p.
|
||||||
///
|
///
|
||||||
@ -78,11 +79,11 @@ impl TcpConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Transport for TcpConfig {
|
impl Transport for TcpConfig {
|
||||||
type Output = TcpStream;
|
type Output = TcpTransStream;
|
||||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||||
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
|
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
|
||||||
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
|
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
|
||||||
type Dial = Box<Future<Item = (TcpStream, Self::MultiaddrFuture), Error = IoError>>;
|
type Dial = Box<Future<Item = (TcpTransStream, Self::MultiaddrFuture), Error = IoError>>;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
||||||
@ -106,16 +107,21 @@ impl Transport for TcpConfig {
|
|||||||
let future = future::result(listener)
|
let future = future::result(listener)
|
||||||
.map(|listener| {
|
.map(|listener| {
|
||||||
// Pull out a stream of sockets for incoming connections
|
// Pull out a stream of sockets for incoming connections
|
||||||
listener.incoming().map(|sock| {
|
listener.incoming()
|
||||||
let addr = match sock.peer_addr() {
|
.map(|sock| {
|
||||||
Ok(addr) => addr.to_multiaddr()
|
let addr = match sock.peer_addr() {
|
||||||
.expect("generating a multiaddr from a socket addr never fails"),
|
Ok(addr) => addr.to_multiaddr()
|
||||||
Err(err) => return future::err(err),
|
.expect("generating a multiaddr from a socket addr never fails"),
|
||||||
};
|
Err(err) => return future::err(err),
|
||||||
|
};
|
||||||
|
|
||||||
debug!("Incoming connection from {}", addr);
|
debug!("Incoming connection from {}", addr);
|
||||||
future::ok((sock, future::ok(addr)))
|
future::ok((TcpTransStream { inner: sock }, future::ok(addr)))
|
||||||
})
|
})
|
||||||
|
.map_err(|err| {
|
||||||
|
debug!("Error in TCP listener: {:?}", err);
|
||||||
|
err
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.flatten_stream();
|
.flatten_stream();
|
||||||
Ok((Box::new(future), new_addr))
|
Ok((Box::new(future), new_addr))
|
||||||
@ -130,7 +136,14 @@ impl Transport for TcpConfig {
|
|||||||
// If so, we instantly refuse dialing instead of going through the kernel.
|
// If so, we instantly refuse dialing instead of going through the kernel.
|
||||||
if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() {
|
if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() {
|
||||||
debug!("Dialing {}", addr);
|
debug!("Dialing {}", addr);
|
||||||
let fut = TcpStream::connect(&socket_addr).map(|t| (t, future::ok(addr)));
|
let fut = TcpStream::connect(&socket_addr)
|
||||||
|
.map(|t| {
|
||||||
|
(TcpTransStream { inner: t }, future::ok(addr))
|
||||||
|
})
|
||||||
|
.map_err(move |err| {
|
||||||
|
debug!("Error while dialing {:?} => {:?}", socket_addr, err);
|
||||||
|
err
|
||||||
|
});
|
||||||
Ok(Box::new(fut) as Box<_>)
|
Ok(Box::new(fut) as Box<_>)
|
||||||
} else {
|
} else {
|
||||||
debug!("Instantly refusing dialing {}, as it is invalid", addr);
|
debug!("Instantly refusing dialing {}, as it is invalid", addr);
|
||||||
@ -190,6 +203,51 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wraps around a `TcpStream` and adds logging for important events.
|
||||||
|
pub struct TcpTransStream {
|
||||||
|
inner: TcpStream
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for TcpTransStream {
|
||||||
|
#[inline]
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
|
||||||
|
self.inner.read(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for TcpTransStream {
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Write for TcpTransStream {
|
||||||
|
#[inline]
|
||||||
|
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
|
||||||
|
self.inner.write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn flush(&mut self) -> Result<(), IoError> {
|
||||||
|
self.inner.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWrite for TcpTransStream {
|
||||||
|
#[inline]
|
||||||
|
fn shutdown(&mut self) -> Poll<(), IoError> {
|
||||||
|
AsyncWrite::shutdown(&mut self.inner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TcpTransStream {
|
||||||
|
#[inline]
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Ok(addr) = self.inner.peer_addr() {
|
||||||
|
debug!("Dropped TCP connection to {:?}", addr);
|
||||||
|
} else {
|
||||||
|
debug!("Dropped TCP connection to undeterminate peer");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{multiaddr_to_socketaddr, TcpConfig};
|
use super::{multiaddr_to_socketaddr, TcpConfig};
|
||||||
|
Reference in New Issue
Block a user