Populate TcpConfig with more config (#454)

This commit is contained in:
Pierre Krieger
2018-09-11 12:04:35 +02:00
committed by GitHub
parent fd4b7488be
commit 5ecdb71c29

View File

@ -68,7 +68,18 @@ use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream};
/// obtained by libp2p through the tokio reactor.
#[derive(Debug, Clone, Default)]
pub struct TcpConfig {
/// How long a listener should sleep after receiving an error, before trying again.
sleep_on_error: Duration,
/// Size of the recv buffer size to set for opened sockets, or `None` to keep default.
recv_buffer_size: Option<usize>,
/// Size of the send buffer size to set for opened sockets, or `None` to keep default.
send_buffer_size: Option<usize>,
/// TTL to set for opened sockets, or `None` to keep default.
ttl: Option<u32>,
/// Keep alive duration to set for opened sockets, or `None` to keep default.
keepalive: Option<Option<Duration>>,
/// `TCP_NODELAY` to set for opened sockets, or `None` to keep default.
nodelay: Option<bool>,
}
impl TcpConfig {
@ -77,8 +88,48 @@ impl TcpConfig {
pub fn new() -> TcpConfig {
TcpConfig {
sleep_on_error: Duration::from_millis(100),
recv_buffer_size: None,
send_buffer_size: None,
ttl: None,
keepalive: None,
nodelay: None,
}
}
/// Sets the size of the recv buffer size to set for opened sockets.
#[inline]
pub fn recv_buffer_size(mut self, value: usize) -> Self {
self.recv_buffer_size = Some(value);
self
}
/// Sets the size of the send buffer size to set for opened sockets.
#[inline]
pub fn send_buffer_size(mut self, value: usize) -> Self {
self.send_buffer_size = Some(value);
self
}
/// Sets the TTL to set for opened sockets.
#[inline]
pub fn ttl(mut self, value: u32) -> Self {
self.ttl = Some(value);
self
}
/// Sets the keep alive pinging duration to set for opened sockets.
#[inline]
pub fn keepalive(mut self, value: Option<Duration>) -> Self {
self.keepalive = Some(value);
self
}
/// Sets the `TCP_NODELAY` to set for opened sockets.
#[inline]
pub fn nodelay(mut self, value: bool) -> Self {
self.nodelay = Some(value);
self
}
}
impl Transport for TcpConfig {
@ -110,7 +161,13 @@ impl Transport for TcpConfig {
let inner = listener
.map_err(Some)
.map(move |l| l.incoming().sleep_on_error(sleep_on_error));
Ok((TcpListenStream { inner }, new_addr))
Ok((
TcpListenStream {
inner,
config: self,
},
new_addr,
))
} else {
Err((self, addr))
}
@ -124,6 +181,7 @@ impl Transport for TcpConfig {
debug!("Dialing {}", addr);
Ok(TcpDialFut {
inner: TcpStream::connect(&socket_addr),
config: self,
addr: Some(addr),
})
} else {
@ -190,11 +248,38 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
}
}
/// Applies the socket configuration parameters to a socket.
fn apply_config(config: &TcpConfig, socket: &TcpStream) -> Result<(), IoError> {
if let Some(recv_buffer_size) = config.recv_buffer_size {
socket.set_recv_buffer_size(recv_buffer_size)?;
}
if let Some(send_buffer_size) = config.send_buffer_size {
socket.set_send_buffer_size(send_buffer_size)?;
}
if let Some(ttl) = config.ttl {
socket.set_ttl(ttl)?;
}
if let Some(keepalive) = config.keepalive {
socket.set_keepalive(keepalive)?;
}
if let Some(nodelay) = config.nodelay {
socket.set_nodelay(nodelay)?;
}
Ok(())
}
/// Future that dials a TCP/IP address.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct TcpDialFut {
inner: ConnectFuture,
/// Original configuration.
config: TcpConfig,
/// Address we're dialing. Extracted when the `Future` finishes.
addr: Option<Multiaddr>,
}
@ -206,6 +291,7 @@ impl Future for TcpDialFut {
fn poll(&mut self) -> Poll<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError> {
match self.inner.poll() {
Ok(Async::Ready(stream)) => {
apply_config(&self.config, &stream)?;
let addr = self
.addr
.take()
@ -229,6 +315,8 @@ impl Future for TcpDialFut {
/// Stream that listens on an TCP/IP address.
pub struct TcpListenStream {
inner: Result<SleepOnError<Incoming>, Option<IoError>>,
/// Original configuration.
config: TcpConfig,
}
impl Stream for TcpListenStream {
@ -250,6 +338,11 @@ impl Stream for TcpListenStream {
match inner.poll() {
Ok(Async::Ready(Some(sock))) => {
match apply_config(&self.config, &sock) {
Ok(()) => (),
Err(err) => return Ok(Async::Ready(Some(future::err(err)))),
};
let addr = match sock.peer_addr() {
// TODO: remove this expect()
Ok(addr) => addr