Use the new version of tokio (#303)

This commit is contained in:
Pierre Krieger
2018-07-16 12:15:27 +02:00
committed by GitHub
parent e74e3f4950
commit 16e3453b7f
41 changed files with 231 additions and 342 deletions

View File

@ -27,22 +27,14 @@
//!
//! # Usage
//!
//! Create [a tokio `Core`](https://docs.rs/tokio-core/0.1/tokio_core/reactor/struct.Core.html),
//! then grab a handle by calling the `handle()` method on it, then create a `TcpConfig` and pass
//! the handle.
//!
//! Example:
//!
//! ```
//! extern crate libp2p_tcp_transport;
//! extern crate tokio_core;
//!
//! use libp2p_tcp_transport::TcpConfig;
//! use tokio_core::reactor::Core;
//!
//! # fn main() {
//! let mut core = Core::new().unwrap();
//! let tcp = TcpConfig::new(core.handle());
//! let tcp = TcpConfig::new();
//! # }
//! ```
//!
@ -54,7 +46,11 @@ extern crate libp2p_core as swarm;
#[macro_use]
extern crate log;
extern crate multiaddr;
extern crate tokio_core;
extern crate tokio_tcp;
#[cfg(test)]
extern crate tokio_current_thread;
#[cfg(test)]
extern crate tokio_io;
use futures::future::{self, Future, FutureResult};
@ -64,25 +60,21 @@ use std::io::Error as IoError;
use std::iter;
use std::net::SocketAddr;
use swarm::Transport;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Handle;
use tokio_tcp::{TcpListener, TcpStream};
/// Represents the configuration for a TCP/IP transport capability for libp2p.
///
/// Each connection created by this config is tied to a tokio reactor. The TCP sockets created by
/// libp2p will need to be progressed by running the futures and streams obtained by libp2p
/// through the tokio reactor.
/// The TCP sockets created by libp2p will need to be progressed by running the futures and streams
/// obtained by libp2p through the tokio reactor.
#[derive(Debug, Clone)]
pub struct TcpConfig {
event_loop: Handle,
}
impl TcpConfig {
/// Creates a new configuration object for TCP/IP. The `Handle` is a tokio reactor the
/// connections will be created with.
/// Creates a new configuration object for TCP/IP.
#[inline]
pub fn new(handle: Handle) -> TcpConfig {
TcpConfig { event_loop: handle }
pub fn new() -> TcpConfig {
TcpConfig {}
}
}
@ -95,7 +87,7 @@ impl Transport for TcpConfig {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
let listener = TcpListener::bind(&socket_addr, &self.event_loop);
let listener = TcpListener::bind(&socket_addr);
// We need to build the `Multiaddr` to return from this function. If an error happened,
// just return the original multiaddr.
let new_addr = match listener {
@ -115,9 +107,13 @@ impl Transport for TcpConfig {
let future = future::result(listener)
.map(|listener| {
// Pull out a stream of sockets for incoming connections
listener.incoming().map(|(sock, addr)| {
let addr = addr.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails");
listener.incoming().map(|sock| {
let addr = match sock.peer_addr() {
Ok(addr) => addr.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"),
Err(err) => return future::err(err),
};
debug!("Incoming connection from {}", addr);
future::ok((sock, future::ok(addr)))
})
@ -135,7 +131,7 @@ impl Transport for TcpConfig {
// If so, we instantly refuse dialing instead of going through the kernel.
if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() {
debug!("Dialing {}", addr);
let fut = TcpStream::connect(&socket_addr, &self.event_loop)
let fut = TcpStream::connect(&socket_addr)
.map(|t| (t, future::ok(addr)));
Ok(Box::new(fut) as Box<_>)
} else {
@ -205,7 +201,7 @@ mod tests {
use std;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use swarm::Transport;
use tokio_core::reactor::Core;
use tokio_current_thread;
use tokio_io;
#[test]
@ -258,10 +254,8 @@ mod tests {
use std::io::Write;
std::thread::spawn(move || {
let mut core = Core::new().unwrap();
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
let tcp = TcpConfig::new(core.handle());
let handle = core.handle();
let tcp = TcpConfig::new();
let listener = tcp.listen_on(addr).unwrap().0.for_each(|sock| {
sock.and_then(|(sock, _)| {
// Define what to do with the socket that just connected to us
@ -271,37 +265,32 @@ mod tests {
.map_err(|err| panic!("IO error {:?}", err));
// Spawn the future as a concurrent task
handle.spawn(handle_conn);
tokio_current_thread::spawn(handle_conn);
Ok(())
})
});
core.run(listener).unwrap();
tokio_current_thread::block_on_all(listener).unwrap();
});
std::thread::sleep(std::time::Duration::from_millis(100));
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
let mut core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let tcp = TcpConfig::new();
// Obtain a future socket through dialing
let socket = tcp.dial(addr.clone()).unwrap();
// Define what to do with the socket once it's obtained
let action = socket.then(|sock| match sock {
Ok((mut s, _)) => {
let written = s.write(&[0x1, 0x2, 0x3]).unwrap();
Ok(written)
}
Err(x) => Err(x),
let action = socket.then(|sock| -> Result<(), ()> {
sock.unwrap().0.write(&[0x1, 0x2, 0x3]).unwrap();
Ok(())
});
// Execute the future in our event loop
core.run(action).unwrap();
tokio_current_thread::block_on_all(action).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
}
#[test]
fn replace_port_0_in_returned_multiaddr_ipv4() {
let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let tcp = TcpConfig::new();
let addr = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
assert!(addr.to_string().contains("tcp/0"));
@ -312,8 +301,7 @@ mod tests {
#[test]
fn replace_port_0_in_returned_multiaddr_ipv6() {
let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let tcp = TcpConfig::new();
let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap();
assert!(addr.to_string().contains("tcp/0"));
@ -324,8 +312,7 @@ mod tests {
#[test]
fn larger_addr_denied() {
let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let tcp = TcpConfig::new();
let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345"
.parse::<Multiaddr>()
@ -335,8 +322,7 @@ mod tests {
#[test]
fn nat_traversal() {
let core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let tcp = TcpConfig::new();
let server = "/ip4/127.0.0.1/tcp/10000".parse::<Multiaddr>().unwrap();
let observed = "/ip4/80.81.82.83/tcp/25000".parse::<Multiaddr>().unwrap();