diff --git a/core/Cargo.toml b/core/Cargo.toml index 265e504a..60117bd1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,4 +23,4 @@ libp2p-ping = { path = "../ping" } libp2p-tcp-transport = { path = "../tcp-transport" } libp2p-mplex = { path = "../mplex" } rand = "0.5" -tokio-core = "0.1" +tokio-current-thread = "0.1" diff --git a/core/README.md b/core/README.md index 5bb621be..67563436 100644 --- a/core/README.md +++ b/core/README.md @@ -66,7 +66,7 @@ connection upgrade. ```rust extern crate libp2p_core; extern crate libp2p_tcp_transport; -extern crate tokio_core; +extern crate tokio_current_thread; use libp2p_core::Transport; @@ -111,7 +111,7 @@ extern crate futures; extern crate libp2p_ping; extern crate libp2p_core; extern crate libp2p_tcp_transport; -extern crate tokio_core; +extern crate tokio_current_thread; use futures::Future; use libp2p_ping::Ping; @@ -119,7 +119,7 @@ use libp2p_core::Transport; let mut core = tokio_core::reactor::Core::new().unwrap(); -let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) +let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() // We have a `TcpConfig` struct that implements `Transport`, and apply a `Ping` upgrade on it. .with_upgrade(Ping) // TODO: right now the only available protocol is ping, but we want to replace it with @@ -130,7 +130,7 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) }); // Runs until the ping arrives. -core.run(ping_finished_future).unwrap(); +tokio_current_thread::block_on_all(ping_finished_future).unwrap(); ``` ## Grouping protocols @@ -151,7 +151,7 @@ extern crate futures; extern crate libp2p_ping; extern crate libp2p_core; extern crate libp2p_tcp_transport; -extern crate tokio_core; +extern crate tokio_current_thread; use futures::Future; use libp2p_ping::Ping; @@ -159,7 +159,7 @@ use libp2p_core::Transport; let mut core = tokio_core::reactor::Core::new().unwrap(); -let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) +let transport = libp2p_tcp_transport::TcpConfig::new() .with_dummy_muxing(); let (swarm_controller, swarm_future) = libp2p_core::swarm(transport, Ping, |(mut pinger, service), client_addr| { @@ -172,5 +172,5 @@ let (swarm_controller, swarm_future) = libp2p_core::swarm(transport, Ping, |(mut swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); // Runs until everything is finished. -core.run(swarm_future).unwrap(); +tokio_current_thread::block_on_all(swarm_future).unwrap(); ``` diff --git a/core/src/lib.rs b/core/src/lib.rs index c865f755..f70f1522 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -87,13 +87,11 @@ //! ``` //! extern crate libp2p_core; //! extern crate libp2p_tcp_transport; -//! extern crate tokio_core; //! //! use libp2p_core::Transport; //! //! # fn main() { -//! let tokio_core = tokio_core::reactor::Core::new().unwrap(); -//! let tcp_transport = libp2p_tcp_transport::TcpConfig::new(tokio_core.handle()); +//! let tcp_transport = libp2p_tcp_transport::TcpConfig::new(); //! let upgraded = tcp_transport.with_upgrade(libp2p_core::upgrade::PlainTextConfig); //! //! // upgraded.dial(...) // automatically applies the plain text protocol on the socket @@ -134,16 +132,14 @@ //! extern crate libp2p_ping; //! extern crate libp2p_core; //! extern crate libp2p_tcp_transport; -//! extern crate tokio_core; +//! extern crate tokio_current_thread; //! //! use futures::Future; //! use libp2p_ping::Ping; //! use libp2p_core::Transport; //! //! # fn main() { -//! let mut core = tokio_core::reactor::Core::new().unwrap(); -//! -//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) +//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() //! // We have a `TcpConfig` struct that implements `Transport`, and apply a `Ping` upgrade on it. //! .with_upgrade(Ping) //! // TODO: right now the only available protocol is ping, but we want to replace it with @@ -154,7 +150,7 @@ //! }); //! //! // Runs until the ping arrives. -//! core.run(ping_finished_future).unwrap(); +//! tokio_current_thread::block_on_all(ping_finished_future).unwrap(); //! # } //! ``` //! @@ -176,16 +172,14 @@ //! extern crate libp2p_ping; //! extern crate libp2p_core; //! extern crate libp2p_tcp_transport; -//! extern crate tokio_core; +//! extern crate tokio_current_thread; //! //! use futures::Future; //! use libp2p_ping::Ping; //! use libp2p_core::Transport; //! //! # fn main() { -//! let mut core = tokio_core::reactor::Core::new().unwrap(); -//! -//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) +//! let transport = libp2p_tcp_transport::TcpConfig::new() //! .with_dummy_muxing(); //! //! let (swarm_controller, swarm_future) = libp2p_core::swarm(transport.with_upgrade(Ping), @@ -199,7 +193,7 @@ //! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); //! //! // Runs until everything is finished. -//! core.run(swarm_future).unwrap(); +//! tokio_current_thread::block_on_all(swarm_future).unwrap(); //! # } //! ``` diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index fc7e250c..7f03abcb 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -23,7 +23,7 @@ extern crate futures; extern crate libp2p_mplex as multiplex; extern crate libp2p_core; extern crate libp2p_tcp_transport; -extern crate tokio_core; +extern crate tokio_current_thread; extern crate tokio_io; use bytes::BytesMut; @@ -33,7 +33,6 @@ use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport}; use libp2p_tcp_transport::TcpConfig; use std::sync::{atomic, mpsc}; use std::thread; -use tokio_core::reactor::Core; use tokio_io::codec::length_delimited::Framed; // Ensures that a transport is only ever used once for dialing. @@ -78,8 +77,7 @@ fn client_to_server_outbound() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()) + let transport = TcpConfig::new() .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); @@ -106,11 +104,10 @@ fn client_to_server_outbound() { Ok(()) }); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); }); - let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); + let transport = TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let future = transport .dial(rx.recv().unwrap()) @@ -120,7 +117,7 @@ fn client_to_server_outbound() { .and_then(|server| server.send("hello world".into())) .map(|_| ()); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); bg_thread.join().unwrap(); } @@ -132,8 +129,7 @@ fn connection_reused_for_dialing() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let mut core = Core::new().unwrap(); - let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + let transport = OnlyOnce::from(TcpConfig::new()) .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); @@ -171,11 +167,10 @@ fn connection_reused_for_dialing() { Ok(()) }); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); }); - let mut core = Core::new().unwrap(); - let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + let transport = OnlyOnce::from(TcpConfig::new()) .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); @@ -198,7 +193,7 @@ fn connection_reused_for_dialing() { .and_then(|(_first, second)| second.send("second message".into())) .map(|_| ()); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); bg_thread.join().unwrap(); } @@ -211,8 +206,7 @@ fn use_opened_listen_to_dial() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let mut core = Core::new().unwrap(); - let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + let transport = OnlyOnce::from(TcpConfig::new()) .with_upgrade(multiplex::MultiplexConfig::new()); let (listener, addr) = transport @@ -250,11 +244,10 @@ fn use_opened_listen_to_dial() { Ok(()) }); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); }); - let mut core = Core::new().unwrap(); - let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + let transport = OnlyOnce::from(TcpConfig::new()) .with_upgrade(multiplex::MultiplexConfig::new()) .into_connection_reuse(); @@ -277,6 +270,6 @@ fn use_opened_listen_to_dial() { .and_then(|(_first, second)| second.send("second message".into())) .map(|_| ()); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); bg_thread.join().unwrap(); } diff --git a/dns/Cargo.toml b/dns/Cargo.toml index 1e8e1685..c50f322d 100644 --- a/dns/Cargo.toml +++ b/dns/Cargo.toml @@ -8,7 +8,7 @@ libp2p-core = { path = "../core" } log = "0.4.1" futures = "0.1" multiaddr = { path = "../multiaddr" } -tokio-dns-unofficial = "0.1" +tokio-dns-unofficial = "0.3" tokio-io = "0.1" [dev-dependencies] diff --git a/identify/Cargo.toml b/identify/Cargo.toml index bafcff10..cdc5ee88 100644 --- a/identify/Cargo.toml +++ b/identify/Cargo.toml @@ -19,4 +19,4 @@ varint = { path = "../varint-rs" } [dev-dependencies] libp2p-tcp-transport = { path = "../tcp-transport" } -tokio-core = "0.1.0" +tokio-current-thread = "0.1" diff --git a/identify/src/peer_id_transport.rs b/identify/src/peer_id_transport.rs index d48884f8..410ad1be 100644 --- a/identify/src/peer_id_transport.rs +++ b/identify/src/peer_id_transport.rs @@ -292,10 +292,9 @@ fn multiaddr_to_peerid(addr: Multiaddr) -> Result { #[cfg(test)] mod tests { extern crate libp2p_tcp_transport; - extern crate tokio_core; + extern crate tokio_current_thread; use self::libp2p_tcp_transport::TcpConfig; - use self::tokio_core::reactor::Core; use PeerIdTransport; use futures::{Future, Stream}; use libp2p_core::{Transport, PeerId, PublicKey}; @@ -341,9 +340,8 @@ mod tests { let peer_id = PeerId::from_public_key(PublicKey::Ed25519(vec![1, 2, 3, 4])); - let mut core = Core::new().unwrap(); let underlying = UnderlyingTrans { - inner: TcpConfig::new(core.handle()), + inner: TcpConfig::new(), }; let transport = PeerIdTransport::new(underlying, { let peer_id = peer_id.clone(); @@ -358,6 +356,6 @@ mod tests { .unwrap_or_else(|_| panic!()) .then::<_, Result<(), ()>>(|_| Ok(())); - let _ = core.run(future).unwrap(); + let _ = tokio_current_thread::block_on_all(future).unwrap(); } } diff --git a/identify/src/protocol.rs b/identify/src/protocol.rs index 63842765..a9bec925 100644 --- a/identify/src/protocol.rs +++ b/identify/src/protocol.rs @@ -224,10 +224,9 @@ fn parse_proto_msg(msg: BytesMut) -> Result<(IdentifyInfo, Multiaddr), IoError> #[cfg(test)] mod tests { extern crate libp2p_tcp_transport; - extern crate tokio_core; + extern crate tokio_current_thread; use self::libp2p_tcp_transport::TcpConfig; - use self::tokio_core::reactor::Core; use futures::{Future, Stream}; use libp2p_core::{PublicKey, Transport}; use std::sync::mpsc; @@ -242,8 +241,7 @@ mod tests { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(IdentifyProtocolConfig); + let transport = TcpConfig::new().with_upgrade(IdentifyProtocolConfig); let (listener, addr) = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -271,11 +269,10 @@ mod tests { _ => panic!(), }); - let _ = core.run(future).unwrap(); + let _ = tokio_current_thread::block_on_all(future).unwrap(); }); - let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(IdentifyProtocolConfig); + let transport = TcpConfig::new().with_upgrade(IdentifyProtocolConfig); let future = transport .dial(rx.recv().unwrap()) @@ -308,7 +305,7 @@ mod tests { _ => panic!(), }); - let _ = core.run(future).unwrap(); + let _ = tokio_current_thread::block_on_all(future).unwrap(); bg_thread.join().unwrap(); } } diff --git a/kad/Cargo.toml b/kad/Cargo.toml index 3461bca3..02875295 100644 --- a/kad/Cargo.toml +++ b/kad/Cargo.toml @@ -28,4 +28,4 @@ varint = { path = "../varint-rs" } [dev-dependencies] libp2p-tcp-transport = { path = "../tcp-transport" } rand = "0.4.2" -tokio-core = "0.1" +tokio-current-thread = "0.1" diff --git a/kad/src/protocol.rs b/kad/src/protocol.rs index 865c793b..3f8ad791 100644 --- a/kad/src/protocol.rs +++ b/kad/src/protocol.rs @@ -304,10 +304,9 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result Result<(), Box> { @@ -121,15 +120,13 @@ struct ListenerOpts { } fn run_dialer(opts: DialerOpts) -> Result<(), Box> { - let mut core = Core::new()?; - let store = Arc::new(MemoryPeerstore::empty()); for (p, a) in opts.peers { store.peer_or_create(&p).add_addr(a, Duration::from_secs(600)) } let transport = { - let tcp = TcpConfig::new(core.handle()) + let tcp = TcpConfig::new() .with_upgrade(libp2p_yamux::Config::default()) .into_connection_reuse(); RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing() @@ -153,18 +150,16 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { control.dial(address, transport.with_upgrade(echo)).map_err(|_| "failed to dial")?; - core.run(future).map_err(From::from) + tokio_current_thread::block_on_all(future).map_err(From::from) } fn run_listener(opts: ListenerOpts) -> Result<(), Box> { - let mut core = Core::new()?; - let store = Arc::new(MemoryPeerstore::empty()); for (p, a) in opts.peers { store.peer_or_create(&p).add_addr(a, Duration::from_secs(600)) } - let transport = TcpConfig::new(core.handle()) + let transport = TcpConfig::new() .with_upgrade(libp2p_yamux::Config::default()) .into_connection_reuse(); @@ -207,7 +202,7 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box> { }); control.listen_on(opts.listen).map_err(|_| "failed to listen")?; - core.run(future).map_err(From::from) + tokio_current_thread::block_on_all(future).map_err(From::from) } // Custom parsers /////////////////////////////////////////////////////////// diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 9baf3d72..888d0bf0 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -21,7 +21,7 @@ pub extern crate bytes; pub extern crate futures; #[cfg(not(target_os = "emscripten"))] -pub extern crate tokio_core; +pub extern crate tokio_current_thread; pub extern crate multiaddr; pub extern crate tokio_io; pub extern crate tokio_codec; @@ -55,7 +55,6 @@ pub use self::transport_timeout::TransportTimeout; /// /// The list currently is TCP/IP, DNS, and WebSockets. However this list could change in the /// future to get new transports. -// TODO: handle the emscripten situation, because we shouldn't depend on tokio-core with emscripten #[derive(Debug, Clone)] pub struct CommonTransport { // The actual implementation of everything. @@ -76,8 +75,8 @@ impl CommonTransport { /// Initializes the `CommonTransport`. #[inline] #[cfg(not(target_os = "emscripten"))] - pub fn new(tokio_handle: tokio_core::reactor::Handle) -> CommonTransport { - let tcp = tcp::TcpConfig::new(tokio_handle); + pub fn new() -> CommonTransport { + let tcp = tcp::TcpConfig::new(); let with_dns = dns::DnsConfig::new(tcp); let with_ws = websocket::WsConfig::new(with_dns.clone()); let inner = with_dns.or_transport(with_ws); diff --git a/mplex/Cargo.toml b/mplex/Cargo.toml index e7648613..1162fc7b 100644 --- a/mplex/Cargo.toml +++ b/mplex/Cargo.toml @@ -21,4 +21,4 @@ varint = { path = "../varint-rs" } [dev-dependencies] libp2p-tcp-transport = { path = "../tcp-transport" } -tokio-core = "0.1" +tokio-current-thread = "0.1" diff --git a/mplex/tests/two_peers.rs b/mplex/tests/two_peers.rs index 97160de1..a455c62f 100644 --- a/mplex/tests/two_peers.rs +++ b/mplex/tests/two_peers.rs @@ -23,7 +23,7 @@ extern crate futures; extern crate libp2p_mplex as multiplex; extern crate libp2p_core as swarm; extern crate libp2p_tcp_transport as tcp; -extern crate tokio_core; +extern crate tokio_current_thread; extern crate tokio_io; use futures::future::Future; @@ -32,7 +32,6 @@ use std::sync::mpsc; use std::thread; use swarm::{StreamMuxer, Transport}; use tcp::TcpConfig; -use tokio_core::reactor::Core; use tokio_io::codec::length_delimited::Framed; #[test] @@ -42,9 +41,8 @@ fn client_to_server_outbound() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let mut core = Core::new().unwrap(); let transport = - TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); + TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let (listener, addr) = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -69,11 +67,10 @@ fn client_to_server_outbound() { Ok(()) }); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); }); - let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); + let transport = TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let future = transport .dial(rx.recv().unwrap()) @@ -83,7 +80,7 @@ fn client_to_server_outbound() { .and_then(|server| server.send("hello world".into())) .map(|_| ()); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); bg_thread.join().unwrap(); } @@ -94,9 +91,8 @@ fn client_to_server_inbound() { let (tx, rx) = mpsc::channel(); let bg_thread = thread::spawn(move || { - let mut core = Core::new().unwrap(); let transport = - TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); + TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let (listener, addr) = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -121,11 +117,10 @@ fn client_to_server_inbound() { Ok(()) }); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); }); - let mut core = Core::new().unwrap(); - let transport = TcpConfig::new(core.handle()).with_upgrade(multiplex::MultiplexConfig::new()); + let transport = TcpConfig::new().with_upgrade(multiplex::MultiplexConfig::new()); let future = transport .dial(rx.recv().unwrap()) @@ -135,6 +130,6 @@ fn client_to_server_inbound() { .and_then(|server| server.send("hello world".into())) .map(|_| ()); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); bg_thread.join().unwrap(); } diff --git a/multistream-select/Cargo.toml b/multistream-select/Cargo.toml index 1dfdae9a..c8ad3aed 100644 --- a/multistream-select/Cargo.toml +++ b/multistream-select/Cargo.toml @@ -12,4 +12,5 @@ tokio-io = "0.1" varint = { path = "../varint-rs" } [dev-dependencies] -tokio-core = "0.1" +tokio-current-thread = "0.1" +tokio-tcp = "0.1" diff --git a/multistream-select/README.md b/multistream-select/README.md index 88d3ea81..1f179092 100644 --- a/multistream-select/README.md +++ b/multistream-select/README.md @@ -25,7 +25,7 @@ For a dialer: extern crate bytes; extern crate futures; extern crate multistream_select; -extern crate tokio_core; +extern crate tokio_current_thread; use bytes::Bytes; use multistream_select::dialer_select_proto; @@ -49,7 +49,7 @@ let client = TcpStream::connect(&"127.0.0.1:10333".parse().unwrap(), &core.handl dialer_select_proto(connec, protos).map(|r| r.0) }); -let negotiated_protocol: MyProto = core.run(client).expect("failed to find a protocol"); +let negotiated_protocol: MyProto = tokio_current_thread::block_on_all(client).expect("failed to find a protocol"); println!("negotiated: {:?}", negotiated_protocol); ``` @@ -59,7 +59,7 @@ For a listener: extern crate bytes; extern crate futures; extern crate multistream_select; -extern crate tokio_core; +extern crate tokio_current_thread; use bytes::Bytes; use multistream_select::listener_select_proto; @@ -88,5 +88,5 @@ let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()). Ok(()) }); -core.run(server).expect("failed to run server"); +tokio_current_thread::block_on_all(server).expect("failed to run server"); ``` diff --git a/multistream-select/src/lib.rs b/multistream-select/src/lib.rs index e532fd59..8c378417 100644 --- a/multistream-select/src/lib.rs +++ b/multistream-select/src/lib.rs @@ -48,21 +48,19 @@ //! extern crate bytes; //! extern crate futures; //! extern crate multistream_select; -//! extern crate tokio_core; +//! extern crate tokio_current_thread; +//! extern crate tokio_tcp; //! //! # fn main() { //! use bytes::Bytes; //! use multistream_select::dialer_select_proto; //! use futures::{Future, Sink, Stream}; -//! use tokio_core::net::TcpStream; -//! use tokio_core::reactor::Core; -//! -//! let mut core = Core::new().unwrap(); +//! use tokio_tcp::TcpStream; //! //! #[derive(Debug, Copy, Clone)] //! enum MyProto { Echo, Hello } //! -//! let client = TcpStream::connect(&"127.0.0.1:10333".parse().unwrap(), &core.handle()) +//! let client = TcpStream::connect(&"127.0.0.1:10333".parse().unwrap()) //! .from_err() //! .and_then(move |connec| { //! let protos = vec![ @@ -73,7 +71,8 @@ //! dialer_select_proto(connec, protos).map(|r| r.0) //! }); //! -//! let negotiated_protocol: MyProto = core.run(client).expect("failed to find a protocol"); +//! let negotiated_protocol: MyProto = tokio_current_thread::block_on_all(client) +//! .expect("failed to find a protocol"); //! println!("negotiated: {:?}", negotiated_protocol); //! # } //! ``` @@ -84,24 +83,22 @@ //! extern crate bytes; //! extern crate futures; //! extern crate multistream_select; -//! extern crate tokio_core; +//! extern crate tokio_current_thread; +//! extern crate tokio_tcp; //! //! # fn main() { //! use bytes::Bytes; //! use multistream_select::listener_select_proto; //! use futures::{Future, Sink, Stream}; -//! use tokio_core::net::TcpListener; -//! use tokio_core::reactor::Core; -//! -//! let mut core = Core::new().unwrap(); +//! use tokio_tcp::TcpListener; //! //! #[derive(Debug, Copy, Clone)] //! enum MyProto { Echo, Hello } //! -//! let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap() +//! let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap() //! .incoming() //! .from_err() -//! .and_then(move |(connec, _)| { +//! .and_then(move |connec| { //! let protos = vec![ //! (Bytes::from("/echo/1.0.0"), ::eq, MyProto::Echo), //! (Bytes::from("/hello/2.5.0"), ::eq, MyProto::Hello), @@ -114,7 +111,7 @@ //! Ok(()) //! }); //! -//! core.run(server).expect("failed to run server"); +//! tokio_current_thread::block_on_all(server).expect("failed to run server"); //! # } //! ``` diff --git a/multistream-select/src/protocol/dialer.rs b/multistream-select/src/protocol/dialer.rs index 32e980bc..3ee845bd 100644 --- a/multistream-select/src/protocol/dialer.rs +++ b/multistream-select/src/protocol/dialer.rs @@ -190,9 +190,9 @@ where #[cfg(test)] mod tests { - extern crate tokio_core; - use self::tokio_core::net::{TcpListener, TcpStream}; - use self::tokio_core::reactor::Core; + extern crate tokio_current_thread; + extern crate tokio_tcp; + use self::tokio_tcp::{TcpListener, TcpStream}; use bytes::Bytes; use futures::Future; use futures::{Sink, Stream}; @@ -200,9 +200,7 @@ mod tests { #[test] fn wrong_proto_name() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener @@ -211,7 +209,7 @@ mod tests { .map(|_| ()) .map_err(|(e, _)| e.into()); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |stream| Dialer::new(stream)) .and_then(move |dialer| { @@ -219,7 +217,7 @@ mod tests { dialer.send(DialerToListenerMessage::ProtocolRequest { name: p }) }); - match core.run(server.join(client)) { + match tokio_current_thread::block_on_all(server.join(client)) { Err(MultistreamSelectError::WrongProtocolName) => (), _ => panic!(), } diff --git a/multistream-select/src/protocol/listener.rs b/multistream-select/src/protocol/listener.rs index f676735f..33871fd9 100644 --- a/multistream-select/src/protocol/listener.rs +++ b/multistream-select/src/protocol/listener.rs @@ -186,9 +186,9 @@ where #[cfg(test)] mod tests { - extern crate tokio_core; - use self::tokio_core::net::{TcpListener, TcpStream}; - use self::tokio_core::reactor::Core; + extern crate tokio_current_thread; + extern crate tokio_tcp; + use self::tokio_tcp::{TcpListener, TcpStream}; use bytes::Bytes; use futures::Future; use futures::{Sink, Stream}; @@ -196,26 +196,24 @@ mod tests { #[test] fn wrong_proto_name() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(move |(connec, _)| Listener::new(connec.unwrap().0)) + .and_then(move |(connec, _)| Listener::new(connec.unwrap())) .and_then(|listener| { let proto_name = Bytes::from("invalid-proto"); listener.send(ListenerToDialerMessage::ProtocolAck { name: proto_name }) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |stream| Dialer::new(stream)); - match core.run(server.join(client)) { + match tokio_current_thread::block_on_all(server.join(client)) { Err(MultistreamSelectError::WrongProtocolName) => (), _ => panic!(), } diff --git a/multistream-select/src/tests.rs b/multistream-select/src/tests.rs index 5f2a1707..a0fa832c 100644 --- a/multistream-select/src/tests.rs +++ b/multistream-select/src/tests.rs @@ -22,11 +22,10 @@ #![cfg(test)] -extern crate tokio_core; +extern crate tokio_current_thread; +extern crate tokio_tcp; -use self::tokio_core::net::TcpListener; -use self::tokio_core::net::TcpStream; -use self::tokio_core::reactor::Core; +use self::tokio_tcp::{TcpListener, TcpStream}; use bytes::Bytes; use dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial}; use futures::Future; @@ -37,16 +36,14 @@ use {dialer_select_proto, listener_select_proto}; #[test] fn negotiate_with_self_succeeds() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(move |(connec, _)| Listener::new(connec.unwrap().0)) + .and_then(move |(connec, _)| Listener::new(connec.unwrap())) .and_then(|l| l.into_future().map_err(|(e, _)| e)) .and_then(|(msg, rest)| { let proto = match msg { @@ -56,7 +53,7 @@ fn negotiate_with_self_succeeds() { rest.send(ListenerToDialerMessage::ProtocolAck { name: proto }) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |stream| Dialer::new(stream)) .and_then(move |dialer| { @@ -73,20 +70,18 @@ fn negotiate_with_self_succeeds() { Ok(()) }); - core.run(server.join(client)).unwrap(); + tokio_current_thread::block_on_all(server.join(client)).unwrap(); } #[test] fn select_proto_basic() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() - .map(|s| s.0.unwrap().0) + .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { let protos = vec![ @@ -96,7 +91,7 @@ fn select_proto_basic() { listener_select_proto(connec, protos).map(|r| r.0) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { let protos = vec![ @@ -106,22 +101,20 @@ fn select_proto_basic() { dialer_select_proto(connec, protos).map(|r| r.0) }); - let (dialer_chosen, listener_chosen) = core.run(client.join(server)).unwrap(); + let (dialer_chosen, listener_chosen) = tokio_current_thread::block_on_all(client.join(server)).unwrap(); assert_eq!(dialer_chosen, 3); assert_eq!(listener_chosen, 1); } #[test] fn no_protocol_found() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() - .map(|s| s.0.unwrap().0) + .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { let protos = vec![ @@ -131,7 +124,7 @@ fn no_protocol_found() { listener_select_proto(connec, protos).map(|r| r.0) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { let protos = vec![ @@ -141,7 +134,7 @@ fn no_protocol_found() { dialer_select_proto(connec, protos).map(|r| r.0) }); - match core.run(client.join(server)) { + match tokio_current_thread::block_on_all(client.join(server)) { Err(ProtocolChoiceError::NoProtocolFound) => (), _ => panic!(), } @@ -149,15 +142,13 @@ fn no_protocol_found() { #[test] fn select_proto_parallel() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() - .map(|s| s.0.unwrap().0) + .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { let protos = vec![ @@ -167,7 +158,7 @@ fn select_proto_parallel() { listener_select_proto(connec, protos).map(|r| r.0) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { let protos = vec![ @@ -177,22 +168,20 @@ fn select_proto_parallel() { dialer_select_proto_parallel(connec, protos).map(|r| r.0) }); - let (dialer_chosen, listener_chosen) = core.run(client.join(server)).unwrap(); + let (dialer_chosen, listener_chosen) = tokio_current_thread::block_on_all(client.join(server)).unwrap(); assert_eq!(dialer_chosen, 3); assert_eq!(listener_chosen, 1); } #[test] fn select_proto_serial() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() - .map(|s| s.0.unwrap().0) + .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { let protos = vec![ @@ -202,14 +191,14 @@ fn select_proto_serial() { listener_select_proto(connec, protos).map(|r| r.0) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { let protos = vec![(Bytes::from("/proto3"), 2), (Bytes::from("/proto2"), 3)].into_iter(); dialer_select_proto_serial(connec, protos).map(|r| r.0) }); - let (dialer_chosen, listener_chosen) = core.run(client.join(server)).unwrap(); + let (dialer_chosen, listener_chosen) = tokio_current_thread::block_on_all(client.join(server)).unwrap(); assert_eq!(dialer_chosen, 3); assert_eq!(listener_chosen, 1); } diff --git a/ping/Cargo.toml b/ping/Cargo.toml index cb6f0eb7..4e173a30 100644 --- a/ping/Cargo.toml +++ b/ping/Cargo.toml @@ -17,4 +17,5 @@ tokio-io = "0.1" [dev-dependencies] libp2p-tcp-transport = { path = "../tcp-transport" } -tokio-core = "0.1" +tokio-current-thread = "0.1" +tokio-tcp = "0.1" diff --git a/ping/README.md b/ping/README.md index 0f807a60..454458b8 100644 --- a/ping/README.md +++ b/ping/README.md @@ -33,15 +33,13 @@ extern crate futures; extern crate libp2p_ping; extern crate libp2p_core; extern crate libp2p_tcp_transport; -extern crate tokio_core; +extern crate tokio_current_thread; use futures::Future; use libp2p_ping::Ping; use libp2p_core::Transport; -let mut core = tokio_core::reactor::Core::new().unwrap(); - -let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) +let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() .with_upgrade(Ping) .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) .and_then(|((mut pinger, service), _)| { @@ -49,6 +47,6 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) }); // Runs until the ping arrives. -core.run(ping_finished_future).unwrap(); +tokio_current_thread::block_on_all(ping_finished_future).unwrap(); ``` diff --git a/ping/src/lib.rs b/ping/src/lib.rs index c914bb2e..067999d2 100644 --- a/ping/src/lib.rs +++ b/ping/src/lib.rs @@ -56,16 +56,14 @@ //! extern crate libp2p_ping; //! extern crate libp2p_core; //! extern crate libp2p_tcp_transport; -//! extern crate tokio_core; +//! extern crate tokio_current_thread; //! //! use futures::Future; //! use libp2p_ping::Ping; //! use libp2p_core::Transport; //! //! # fn main() { -//! let mut core = tokio_core::reactor::Core::new().unwrap(); -//! -//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) +//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() //! .with_upgrade(Ping) //! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) //! .and_then(|((mut pinger, service), _)| { @@ -73,7 +71,7 @@ //! }); //! //! // Runs until the ping arrives. -//! core.run(ping_finished_future).unwrap(); +//! tokio_current_thread::block_on_all(ping_finished_future).unwrap(); //! # } //! ``` //! @@ -283,11 +281,11 @@ impl Encoder for Codec { #[cfg(test)] mod tests { - extern crate tokio_core; + extern crate tokio_current_thread; + extern crate tokio_tcp; - use self::tokio_core::net::TcpListener; - use self::tokio_core::net::TcpStream; - use self::tokio_core::reactor::Core; + use self::tokio_tcp::TcpListener; + use self::tokio_tcp::TcpStream; use super::Ping; use futures::future::{self, join_all}; use futures::Future; @@ -297,9 +295,7 @@ mod tests { #[test] fn ping_pong() { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener @@ -308,7 +304,7 @@ mod tests { .map_err(|(e, _)| e.into()) .and_then(|(c, _)| { Ping.upgrade( - c.unwrap().0, + c.unwrap(), (), Endpoint::Listener, future::ok::("/ip4/127.0.0.1/tcp/10000".parse().unwrap()), @@ -322,7 +318,7 @@ mod tests { .map_err(|_| panic!()) }); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(|c| { Ping.upgrade( @@ -340,15 +336,13 @@ mod tests { .map_err(|_| panic!()) }); - core.run(server.join(client)).unwrap(); + tokio_current_thread::block_on_all(server.join(client)).unwrap(); } #[test] fn multipings() { // Check that we can send multiple pings in a row and it will still work. - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener @@ -357,7 +351,7 @@ mod tests { .map_err(|(e, _)| e.into()) .and_then(|(c, _)| { Ping.upgrade( - c.unwrap().0, + c.unwrap(), (), Endpoint::Listener, future::ok::("/ip4/127.0.0.1/tcp/10000".parse().unwrap()), @@ -365,7 +359,7 @@ mod tests { }) .and_then(|((_, service), _)| service.map_err(|_| panic!())); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(|c| { Ping.upgrade( @@ -386,6 +380,6 @@ mod tests { .map_err(|_| panic!()) }); - core.run(server.select(client)).unwrap_or_else(|_| panic!()); + tokio_current_thread::block_on_all(server.select(client)).unwrap_or_else(|_| panic!()); } } diff --git a/secio/Cargo.toml b/secio/Cargo.toml index fc85114f..586936fc 100644 --- a/secio/Cargo.toml +++ b/secio/Cargo.toml @@ -24,4 +24,5 @@ secp256k1 = ["eth-secp256k1"] [dev-dependencies] libp2p-tcp-transport = { path = "../tcp-transport" } -tokio-core = "0.1.6" +tokio-current-thread = "0.1" +tokio-tcp = "0.1" diff --git a/secio/README.md b/secio/README.md index 5a50baaa..3d8f3bf1 100644 --- a/secio/README.md +++ b/secio/README.md @@ -10,7 +10,7 @@ through it. ```rust extern crate futures; -extern crate tokio_core; +extern crate tokio_current_thread; extern crate tokio_io; extern crate libp2p_core; extern crate libp2p_secio; @@ -25,7 +25,7 @@ use tokio_io::io::write_all; let mut core = Core::new().unwrap(); -let transport = TcpConfig::new(core.handle()) +let transport = TcpConfig::new() .with_upgrade({ # let private_key = b""; //let private_key = include_bytes!("test-rsa-private-key.pk8"); @@ -44,7 +44,7 @@ let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::().unwr write_all(connection, "hello world") }); -core.run(future).unwrap(); +tokio_current_thread::block_on_all(future).unwrap(); ``` # Manual usage diff --git a/secio/src/codec/mod.rs b/secio/src/codec/mod.rs index ab0f2745..ae22ba8b 100644 --- a/secio/src/codec/mod.rs +++ b/secio/src/codec/mod.rs @@ -59,10 +59,10 @@ where #[cfg(test)] mod tests { - extern crate tokio_core; - use self::tokio_core::net::TcpListener; - use self::tokio_core::net::TcpStream; - use self::tokio_core::reactor::Core; + extern crate tokio_current_thread; + extern crate tokio_tcp; + use self::tokio_tcp::TcpListener; + use self::tokio_tcp::TcpStream; use super::full_codec; use super::DecoderMiddleware; use super::EncoderMiddleware; @@ -111,8 +111,7 @@ mod tests { let data_sent = encoder.send(BytesMut::from(data.to_vec())).from_err(); let data_received = decoder.into_future().map(|(n, _)| n).map_err(|(e, _)| e); - let mut core = Core::new().unwrap(); - let (_, decoded) = core.run(data_sent.join(data_received)) + let (_, decoded) = tokio_current_thread::block_on_all(data_sent.join(data_received)) .map_err(|_| ()) .unwrap(); assert_eq!(decoded.unwrap(), data); @@ -120,8 +119,6 @@ mod tests { #[test] fn full_codec_encode_then_decode() { - let mut core = Core::new().unwrap(); - let cipher_key: [u8; 32] = rand::random(); let cipher_key_clone = cipher_key.clone(); let hmac_key: [u8; 32] = rand::random(); @@ -129,12 +126,12 @@ mod tests { let data = b"hello world"; let data_clone = data.clone(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener.incoming().into_future().map_err(|(e, _)| e).map( move |(connec, _)| { - let connec = Framed::new(connec.unwrap().0); + let connec = Framed::new(connec.unwrap()); full_codec( connec, @@ -152,7 +149,7 @@ mod tests { }, ); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .map(move |stream| { let stream = Framed::new(stream); @@ -184,7 +181,7 @@ mod tests { .and_then(|server| server.into_future().map_err(|(e, _)| e.into())) .map(|recved| recved.0.unwrap().to_vec()); - let received = core.run(fin).unwrap(); + let received = tokio_current_thread::block_on_all(fin).unwrap(); assert_eq!(received, data); } } diff --git a/secio/src/handshake.rs b/secio/src/handshake.rs index 883ebb8a..a64b568f 100644 --- a/secio/src/handshake.rs +++ b/secio/src/handshake.rs @@ -564,10 +564,10 @@ fn stretch_key(key: &SigningKey, result: &mut [u8]) { #[cfg(test)] mod tests { - extern crate tokio_core; - use self::tokio_core::net::TcpListener; - use self::tokio_core::net::TcpStream; - use self::tokio_core::reactor::Core; + extern crate tokio_current_thread; + extern crate tokio_tcp; + use self::tokio_tcp::TcpListener; + use self::tokio_tcp::TcpStream; use super::handshake; use super::stretch_key; use futures::Future; @@ -617,22 +617,20 @@ mod tests { } fn handshake_with_self_succeeds(key1: SecioKeyPair, key2: SecioKeyPair) { - let mut core = Core::new().unwrap(); - - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener_addr = listener.local_addr().unwrap(); let server = listener .incoming() .into_future() .map_err(|(e, _)| e.into()) - .and_then(move |(connec, _)| handshake(connec.unwrap().0, key1)); + .and_then(move |(connec, _)| handshake(connec.unwrap(), key1)); - let client = TcpStream::connect(&listener_addr, &core.handle()) + let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(move |stream| handshake(stream, key2)); - core.run(server.join(client)).unwrap(); + tokio_current_thread::block_on_all(server.join(client)).unwrap(); } #[test] diff --git a/secio/src/lib.rs b/secio/src/lib.rs index 48e95664..9bae4e77 100644 --- a/secio/src/lib.rs +++ b/secio/src/lib.rs @@ -30,7 +30,7 @@ //! //! ```no_run //! extern crate futures; -//! extern crate tokio_core; +//! extern crate tokio_current_thread; //! extern crate tokio_io; //! extern crate libp2p_core; //! extern crate libp2p_secio; @@ -41,12 +41,9 @@ //! use libp2p_secio::{SecioConfig, SecioKeyPair, SecioOutput}; //! use libp2p_core::{Multiaddr, Transport, upgrade}; //! use libp2p_tcp_transport::TcpConfig; -//! use tokio_core::reactor::Core; //! use tokio_io::io::write_all; //! -//! let mut core = Core::new().unwrap(); -//! -//! let transport = TcpConfig::new(core.handle()) +//! let transport = TcpConfig::new() //! .with_upgrade({ //! # let private_key = b""; //! //let private_key = include_bytes!("test-rsa-private-key.pk8"); @@ -67,7 +64,7 @@ //! write_all(connection, "hello world") //! }); //! -//! core.run(future).unwrap(); +//! tokio_current_thread::block_on_all(future).unwrap(); //! # } //! ``` //! diff --git a/tcp-transport/Cargo.toml b/tcp-transport/Cargo.toml index fe16f6bb..be40e303 100644 --- a/tcp-transport/Cargo.toml +++ b/tcp-transport/Cargo.toml @@ -8,5 +8,8 @@ libp2p-core = { path = "../core" } log = "0.4.1" futures = "0.1" multiaddr = { path = "../multiaddr" } -tokio-core = "0.1" +tokio-tcp = "0.1" + +[dev-dependencies] +tokio-current-thread = "0.1" tokio-io = "0.1" diff --git a/tcp-transport/README.md b/tcp-transport/README.md index 4a2064fa..7e902374 100644 --- a/tcp-transport/README.md +++ b/tcp-transport/README.md @@ -6,21 +6,17 @@ Uses [the *tokio* library](https://tokio.rs). ## Usage -Create [a tokio `Core`](https://docs.rs/tokio-core/0.1/tokio_core/reactor/struct.Core.html), -then grab a handle by calling the `handle()` method on it, then create a `TcpConfig` and pass -the handle. - Example: ```rust extern crate libp2p_tcp_transport; -extern crate tokio_core; +extern crate tokio_current_thread; use libp2p_tcp_transport::TcpConfig; use tokio_core::reactor::Core; let mut core = Core::new().unwrap(); -let tcp = TcpConfig::new(core.handle()); +let tcp = TcpConfig::new(); ``` The `TcpConfig` structs implements the `Transport` trait of the `swarm` library. See the diff --git a/tcp-transport/src/lib.rs b/tcp-transport/src/lib.rs index 5e7b20d7..9e069540 100644 --- a/tcp-transport/src/lib.rs +++ b/tcp-transport/src/lib.rs @@ -27,22 +27,14 @@ //! //! # Usage //! -//! Create [a tokio `Core`](https://docs.rs/tokio-core/0.1/tokio_core/reactor/struct.Core.html), -//! then grab a handle by calling the `handle()` method on it, then create a `TcpConfig` and pass -//! the handle. -//! //! Example: //! //! ``` //! extern crate libp2p_tcp_transport; -//! extern crate tokio_core; -//! //! use libp2p_tcp_transport::TcpConfig; -//! use tokio_core::reactor::Core; //! //! # fn main() { -//! let mut core = Core::new().unwrap(); -//! let tcp = TcpConfig::new(core.handle()); +//! let tcp = TcpConfig::new(); //! # } //! ``` //! @@ -54,7 +46,11 @@ extern crate libp2p_core as swarm; #[macro_use] extern crate log; extern crate multiaddr; -extern crate tokio_core; +extern crate tokio_tcp; + +#[cfg(test)] +extern crate tokio_current_thread; +#[cfg(test)] extern crate tokio_io; use futures::future::{self, Future, FutureResult}; @@ -64,25 +60,21 @@ use std::io::Error as IoError; use std::iter; use std::net::SocketAddr; use swarm::Transport; -use tokio_core::net::{TcpListener, TcpStream}; -use tokio_core::reactor::Handle; +use tokio_tcp::{TcpListener, TcpStream}; /// Represents the configuration for a TCP/IP transport capability for libp2p. /// -/// Each connection created by this config is tied to a tokio reactor. The TCP sockets created by -/// libp2p will need to be progressed by running the futures and streams obtained by libp2p -/// through the tokio reactor. +/// The TCP sockets created by libp2p will need to be progressed by running the futures and streams +/// obtained by libp2p through the tokio reactor. #[derive(Debug, Clone)] pub struct TcpConfig { - event_loop: Handle, } impl TcpConfig { - /// Creates a new configuration object for TCP/IP. The `Handle` is a tokio reactor the - /// connections will be created with. + /// Creates a new configuration object for TCP/IP. #[inline] - pub fn new(handle: Handle) -> TcpConfig { - TcpConfig { event_loop: handle } + pub fn new() -> TcpConfig { + TcpConfig {} } } @@ -95,7 +87,7 @@ impl Transport for TcpConfig { fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - let listener = TcpListener::bind(&socket_addr, &self.event_loop); + let listener = TcpListener::bind(&socket_addr); // We need to build the `Multiaddr` to return from this function. If an error happened, // just return the original multiaddr. let new_addr = match listener { @@ -115,9 +107,13 @@ impl Transport for TcpConfig { let future = future::result(listener) .map(|listener| { // Pull out a stream of sockets for incoming connections - listener.incoming().map(|(sock, addr)| { - let addr = addr.to_multiaddr() - .expect("generating a multiaddr from a socket addr never fails"); + listener.incoming().map(|sock| { + let addr = match sock.peer_addr() { + Ok(addr) => addr.to_multiaddr() + .expect("generating a multiaddr from a socket addr never fails"), + Err(err) => return future::err(err), + }; + debug!("Incoming connection from {}", addr); future::ok((sock, future::ok(addr))) }) @@ -135,7 +131,7 @@ impl Transport for TcpConfig { // If so, we instantly refuse dialing instead of going through the kernel. if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() { debug!("Dialing {}", addr); - let fut = TcpStream::connect(&socket_addr, &self.event_loop) + let fut = TcpStream::connect(&socket_addr) .map(|t| (t, future::ok(addr))); Ok(Box::new(fut) as Box<_>) } else { @@ -205,7 +201,7 @@ mod tests { use std; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use swarm::Transport; - use tokio_core::reactor::Core; + use tokio_current_thread; use tokio_io; #[test] @@ -258,10 +254,8 @@ mod tests { use std::io::Write; std::thread::spawn(move || { - let mut core = Core::new().unwrap(); let addr = "/ip4/127.0.0.1/tcp/12345".parse::().unwrap(); - let tcp = TcpConfig::new(core.handle()); - let handle = core.handle(); + let tcp = TcpConfig::new(); let listener = tcp.listen_on(addr).unwrap().0.for_each(|sock| { sock.and_then(|(sock, _)| { // Define what to do with the socket that just connected to us @@ -271,37 +265,32 @@ mod tests { .map_err(|err| panic!("IO error {:?}", err)); // Spawn the future as a concurrent task - handle.spawn(handle_conn); + tokio_current_thread::spawn(handle_conn); Ok(()) }) }); - core.run(listener).unwrap(); + tokio_current_thread::block_on_all(listener).unwrap(); }); std::thread::sleep(std::time::Duration::from_millis(100)); let addr = "/ip4/127.0.0.1/tcp/12345".parse::().unwrap(); - let mut core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); + let tcp = TcpConfig::new(); // Obtain a future socket through dialing let socket = tcp.dial(addr.clone()).unwrap(); // Define what to do with the socket once it's obtained - let action = socket.then(|sock| match sock { - Ok((mut s, _)) => { - let written = s.write(&[0x1, 0x2, 0x3]).unwrap(); - Ok(written) - } - Err(x) => Err(x), + let action = socket.then(|sock| -> Result<(), ()> { + sock.unwrap().0.write(&[0x1, 0x2, 0x3]).unwrap(); + Ok(()) }); // Execute the future in our event loop - core.run(action).unwrap(); + tokio_current_thread::block_on_all(action).unwrap(); std::thread::sleep(std::time::Duration::from_millis(100)); } #[test] fn replace_port_0_in_returned_multiaddr_ipv4() { - let core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); + let tcp = TcpConfig::new(); let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); assert!(addr.to_string().contains("tcp/0")); @@ -312,8 +301,7 @@ mod tests { #[test] fn replace_port_0_in_returned_multiaddr_ipv6() { - let core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); + let tcp = TcpConfig::new(); let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); assert!(addr.to_string().contains("tcp/0")); @@ -324,8 +312,7 @@ mod tests { #[test] fn larger_addr_denied() { - let core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); + let tcp = TcpConfig::new(); let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" .parse::() @@ -335,8 +322,7 @@ mod tests { #[test] fn nat_traversal() { - let core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); + let tcp = TcpConfig::new(); let server = "/ip4/127.0.0.1/tcp/10000".parse::().unwrap(); let observed = "/ip4/80.81.82.83/tcp/25000".parse::().unwrap(); diff --git a/websocket/Cargo.toml b/websocket/Cargo.toml index 4a63512a..11727aae 100644 --- a/websocket/Cargo.toml +++ b/websocket/Cargo.toml @@ -19,4 +19,4 @@ stdweb = { version = "0.1.3", default-features = false } [target.'cfg(not(target_os = "emscripten"))'.dev-dependencies] libp2p-tcp-transport = { path = "../tcp-transport" } -tokio-core = "0.1" +tokio-current-thread = "0.1" diff --git a/websocket/README.md b/websocket/README.md index 2db15a7d..9f24ca6b 100644 --- a/websocket/README.md +++ b/websocket/README.md @@ -33,7 +33,7 @@ This underlying transport must be put inside a `WsConfig` object through the extern crate libp2p_core; extern crate libp2p_tcp_transport; extern crate libp2p_websocket; -extern crate tokio_core; +extern crate tokio_current_thread; use libp2p_core::{Multiaddr, Transport}; use libp2p_tcp_transport::TcpConfig; @@ -41,6 +41,6 @@ use libp2p_websocket::WsConfig; use tokio_core::reactor::Core; let core = Core::new().unwrap(); -let ws_config = WsConfig::new(TcpConfig::new(core.handle())); +let ws_config = WsConfig::new(TcpConfig::new()); let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); ``` diff --git a/websocket/src/desktop.rs b/websocket/src/desktop.rs index a0734291..9a741bf0 100644 --- a/websocket/src/desktop.rs +++ b/websocket/src/desktop.rs @@ -289,8 +289,7 @@ fn client_addr_to_ws(client_addr: &Multiaddr, is_wss: bool) -> String { #[cfg(test)] mod tests { extern crate libp2p_tcp_transport as tcp; - extern crate tokio_core; - use self::tokio_core::reactor::Core; + extern crate tokio_current_thread; use futures::{Future, Stream}; use multiaddr::Multiaddr; use swarm::Transport; @@ -298,8 +297,7 @@ mod tests { #[test] fn dialer_connects_to_listener_ipv4() { - let mut core = Core::new().unwrap(); - let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + let ws_config = WsConfig::new(tcp::TcpConfig::new()); let (listener, addr) = ws_config .clone() @@ -317,13 +315,12 @@ mod tests { .select(dialer) .map_err(|(e, _)| e) .and_then(|(_, n)| n); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); } #[test] fn dialer_connects_to_listener_ipv6() { - let mut core = Core::new().unwrap(); - let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + let ws_config = WsConfig::new(tcp::TcpConfig::new()); let (listener, addr) = ws_config .clone() @@ -341,13 +338,12 @@ mod tests { .select(dialer) .map_err(|(e, _)| e) .and_then(|(_, n)| n); - core.run(future).unwrap(); + tokio_current_thread::block_on_all(future).unwrap(); } #[test] fn nat_traversal() { - let core = Core::new().unwrap(); - let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + let ws_config = WsConfig::new(tcp::TcpConfig::new()); { let server = "/ip4/127.0.0.1/tcp/10000/ws".parse::().unwrap(); diff --git a/websocket/src/lib.rs b/websocket/src/lib.rs index 0a22e5c0..c5ac4cf6 100644 --- a/websocket/src/lib.rs +++ b/websocket/src/lib.rs @@ -58,16 +58,13 @@ //! extern crate libp2p_core; //! extern crate libp2p_tcp_transport; //! extern crate libp2p_websocket; -//! extern crate tokio_core; //! //! use libp2p_core::{Multiaddr, Transport}; //! use libp2p_tcp_transport::TcpConfig; //! use libp2p_websocket::WsConfig; -//! use tokio_core::reactor::Core; //! //! # fn main() { -//! let core = Core::new().unwrap(); -//! let ws_config = WsConfig::new(TcpConfig::new(core.handle())); +//! let ws_config = WsConfig::new(TcpConfig::new()); //! # return; //! let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); //! # }