diff --git a/tcp-transport/Cargo.toml b/tcp-transport/Cargo.toml index 1e77a06f..837bf7d6 100644 --- a/tcp-transport/Cargo.toml +++ b/tcp-transport/Cargo.toml @@ -9,8 +9,8 @@ libp2p-core = { path = "../core" } log = "0.4.1" futures = "0.1" multiaddr = { path = "../multiaddr" } +tokio-io = "0.1" tokio-tcp = "0.1" [dev-dependencies] tokio-current-thread = "0.1" -tokio-io = "0.1" diff --git a/tcp-transport/src/lib.rs b/tcp-transport/src/lib.rs index 10d28438..779e8795 100644 --- a/tcp-transport/src/lib.rs +++ b/tcp-transport/src/lib.rs @@ -46,21 +46,22 @@ extern crate libp2p_core as swarm; #[macro_use] extern crate log; extern crate multiaddr; +extern crate tokio_io; extern crate tokio_tcp; #[cfg(test)] extern crate tokio_current_thread; -#[cfg(test)] -extern crate tokio_io; +use futures::Poll; use futures::future::{self, Future, FutureResult}; use futures::stream::Stream; use multiaddr::{AddrComponent, Multiaddr, ToMultiaddr}; -use std::io::Error as IoError; +use std::io::{Error as IoError, Read, Write}; use std::iter; use std::net::SocketAddr; use swarm::Transport; use tokio_tcp::{TcpListener, TcpStream}; +use tokio_io::{AsyncRead, AsyncWrite}; /// Represents the configuration for a TCP/IP transport capability for libp2p. /// @@ -78,11 +79,11 @@ impl TcpConfig { } impl Transport for TcpConfig { - type Output = TcpStream; + type Output = TcpTransStream; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; type MultiaddrFuture = FutureResult; - type Dial = Box>; + type Dial = Box>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { @@ -106,16 +107,21 @@ impl Transport for TcpConfig { let future = future::result(listener) .map(|listener| { // Pull out a stream of sockets for incoming connections - listener.incoming().map(|sock| { - let addr = match sock.peer_addr() { - Ok(addr) => addr.to_multiaddr() - .expect("generating a multiaddr from a socket addr never fails"), - Err(err) => return future::err(err), - }; + listener.incoming() + .map(|sock| { + let addr = match sock.peer_addr() { + Ok(addr) => addr.to_multiaddr() + .expect("generating a multiaddr from a socket addr never fails"), + Err(err) => return future::err(err), + }; - debug!("Incoming connection from {}", addr); - future::ok((sock, future::ok(addr))) - }) + debug!("Incoming connection from {}", addr); + future::ok((TcpTransStream { inner: sock }, future::ok(addr))) + }) + .map_err(|err| { + debug!("Error in TCP listener: {:?}", err); + err + }) }) .flatten_stream(); Ok((Box::new(future), new_addr)) @@ -130,7 +136,14 @@ impl Transport for TcpConfig { // If so, we instantly refuse dialing instead of going through the kernel. if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() { debug!("Dialing {}", addr); - let fut = TcpStream::connect(&socket_addr).map(|t| (t, future::ok(addr))); + let fut = TcpStream::connect(&socket_addr) + .map(|t| { + (TcpTransStream { inner: t }, future::ok(addr)) + }) + .map_err(move |err| { + debug!("Error while dialing {:?} => {:?}", socket_addr, err); + err + }); Ok(Box::new(fut) as Box<_>) } else { debug!("Instantly refusing dialing {}, as it is invalid", addr); @@ -190,6 +203,51 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { } } +/// Wraps around a `TcpStream` and adds logging for important events. +pub struct TcpTransStream { + inner: TcpStream +} + +impl Read for TcpTransStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner.read(buf) + } +} + +impl AsyncRead for TcpTransStream { +} + +impl Write for TcpTransStream { + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + self.inner.flush() + } +} + +impl AsyncWrite for TcpTransStream { + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + AsyncWrite::shutdown(&mut self.inner) + } +} + +impl Drop for TcpTransStream { + #[inline] + fn drop(&mut self) { + if let Ok(addr) = self.inner.peer_addr() { + debug!("Dropped TCP connection to {:?}", addr); + } else { + debug!("Dropped TCP connection to undeterminate peer"); + } + } +} + #[cfg(test)] mod tests { use super::{multiaddr_to_socketaddr, TcpConfig};