From 3bcdfb42634854a56347dfbaf62434ce0172a8f2 Mon Sep 17 00:00:00 2001 From: Heyang Zhou Date: Mon, 13 May 2019 06:14:28 -0700 Subject: [PATCH] Asynchronous networking extension. --- Cargo.lock | 83 ++-- Cargo.toml | 2 +- examples/kernel/hello_world/src/lib.rs | 14 - .../Cargo.toml | 6 +- examples/wasi-networking/src/main.rs | 56 +++ lib/kwasm-net/Cargo.toml | 9 + lib/kwasm-net/src/lib.rs | 420 ++++++++++++++++++ 7 files changed, 533 insertions(+), 57 deletions(-) delete mode 100644 examples/kernel/hello_world/src/lib.rs rename examples/{kernel/hello_world => wasi-networking}/Cargo.toml (62%) create mode 100644 examples/wasi-networking/src/main.rs create mode 100644 lib/kwasm-net/Cargo.toml create mode 100644 lib/kwasm-net/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 3b23fd683..04a7da8cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,8 +117,8 @@ dependencies = [ "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -213,12 +213,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -537,9 +537,9 @@ dependencies = [ "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -619,9 +619,9 @@ name = "failure_derive" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", "synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -772,10 +772,6 @@ dependencies = [ "unicode-segmentation 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "hello_world" -version = "0.1.0" - [[package]] name = "hex" version = "0.3.2" @@ -875,8 +871,8 @@ name = "inkwell_internal_macros" version = "0.1.0" source = "git+https://github.com/wasmerio/inkwell?branch=llvm7-0#a14e62977504ef574dc2e933edc559cc79781ca7" dependencies = [ - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -918,6 +914,10 @@ dependencies = [ "wasmer-runtime-core 0.3.0", ] +[[package]] +name = "kwasm-net" +version = "0.1.0" + [[package]] name = "lazy_static" version = "1.3.0" @@ -1344,7 +1344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "proc-macro2" -version = "0.4.27" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1362,10 +1362,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "quote" -version = "0.6.11" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1667,9 +1667,9 @@ name = "scroll_derive" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1736,9 +1736,9 @@ name = "serde_derive" version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1815,9 +1815,9 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1832,11 +1832,11 @@ dependencies = [ [[package]] name = "syn" -version = "0.15.30" +version = "0.15.34" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1853,9 +1853,9 @@ name = "synstructure" version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)", - "quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)", - "syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2240,6 +2240,13 @@ dependencies = [ "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "wasi-networking" +version = "0.1.0" +dependencies = [ + "kwasm-net 0.1.0", +] + [[package]] name = "wasmer" version = "0.4.0" @@ -2740,10 +2747,10 @@ dependencies = [ "checksum pkg-config 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "676e8eb2b1b4c9043511a9b7bea0915320d7e502b0a079fb03f9635a5252b18c" "checksum plain 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" "checksum podio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "780fb4b6698bbf9cf2444ea5d22411cef2953f0824b98f33cf454ec5615645bd" -"checksum proc-macro2 0.4.27 (registry+https://github.com/rust-lang/crates.io-index)" = "4d317f9caece796be1980837fd5cb3dfec5613ebdb04ad0956deea83ce168915" +"checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" -"checksum quote 0.6.11 (registry+https://github.com/rust-lang/crates.io-index)" = "cdd8e04bd9c52e0342b406469d494fcb033be4bdbe5c606016defbb1681411e1" +"checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db" "checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" "checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" @@ -2796,7 +2803,7 @@ dependencies = [ "checksum structopt 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "3d0760c312538987d363c36c42339b55f5ee176ea8808bbe4543d484a291c8d1" "checksum structopt-derive 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "528aeb7351d042e6ffbc2a6fb76a86f9b622fdf7c25932798e7a82cb03bc94c6" "checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad" -"checksum syn 0.15.30 (registry+https://github.com/rust-lang/crates.io-index)" = "66c8865bf5a7cbb662d8b011950060b3c8743dca141b054bf7195b20d314d8e2" +"checksum syn 0.15.34 (registry+https://github.com/rust-lang/crates.io-index)" = "a1393e4a97a19c01e900df2aec855a29f71cf02c402e2f443b8d2747c25c5dbe" "checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6" "checksum synstructure 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)" = "73687139bf99285483c96ac0add482c3776528beac1d97d444f6e91f203a2015" "checksum take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" diff --git a/Cargo.toml b/Cargo.toml index 8849a4918..4b11cc7db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ wasmer-wasi = { path = "lib/wasi", optional = true } kwasm-loader = { path = "lib/kwasm-loader" } [workspace] -members = ["lib/clif-backend", "lib/singlepass-backend", "lib/runtime", "lib/runtime-abi", "lib/runtime-core", "lib/emscripten", "lib/spectests", "lib/win-exception-handler", "lib/runtime-c-api", "lib/llvm-backend", "lib/wasi", "lib/middleware-common", "lib/kwasm-loader", "examples/kernel/hello_world", "examples/pipe", "examples/plugin-for-example"] +members = ["lib/clif-backend", "lib/singlepass-backend", "lib/runtime", "lib/runtime-abi", "lib/runtime-core", "lib/emscripten", "lib/spectests", "lib/win-exception-handler", "lib/runtime-c-api", "lib/llvm-backend", "lib/wasi", "lib/middleware-common", "lib/kwasm-loader", "lib/kwasm-net", "examples/pipe", "examples/wasi-networking", "examples/plugin-for-example"] [build-dependencies] wabt = "0.7.2" diff --git a/examples/kernel/hello_world/src/lib.rs b/examples/kernel/hello_world/src/lib.rs deleted file mode 100644 index 43b7df332..000000000 --- a/examples/kernel/hello_world/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -extern "C" { - fn print_str(base: *const u8, len: usize) -> i32; -} - -#[no_mangle] -pub extern "C" fn main() -> i32 { - let v: Vec = (0..10).collect(); - let s = format!("Hello world from WebAssembly. Some heap allocated integers: {:?}", v); - let s = s.as_bytes(); - unsafe { - print_str(s.as_ptr(), s.len()); - } - return 0; -} diff --git a/examples/kernel/hello_world/Cargo.toml b/examples/wasi-networking/Cargo.toml similarity index 62% rename from examples/kernel/hello_world/Cargo.toml rename to examples/wasi-networking/Cargo.toml index b6d6f2fde..8057f9592 100644 --- a/examples/kernel/hello_world/Cargo.toml +++ b/examples/wasi-networking/Cargo.toml @@ -1,10 +1,8 @@ [package] -name = "hello_world" +name = "wasi-networking" version = "0.1.0" authors = ["Heyang Zhou "] edition = "2018" -[lib] -crate-type = ["cdylib"] - [dependencies] +kwasm-net = { path = "../../lib/kwasm-net" } \ No newline at end of file diff --git a/examples/wasi-networking/src/main.rs b/examples/wasi-networking/src/main.rs new file mode 100644 index 000000000..84a8bcf46 --- /dev/null +++ b/examples/wasi-networking/src/main.rs @@ -0,0 +1,56 @@ +#![feature(wasi_ext)] + +use kwasm_net::{Epoll, Tcp4Listener, TcpStream, schedule}; +use std::io::{Read, Write}; +use std::sync::Arc; + +fn do_echo(stream: Arc, buf: Vec) { + let stream2 = stream.clone(); + stream.read_async(buf, move |result| { + match result { + Ok(buf) => { + if buf.len() == 0 { + return; + } + let stream = stream2.clone(); + stream2.write_all_async(buf, move |result| { + match result { + Ok(buf) => { + schedule(|| { + do_echo(stream, buf); + }); + }, + Err(code) => { + println!("failed to write; code = {}", code); + } + } + }); + }, + Err(code) => { + println!("failed to read; code = {}", code); + } + } + }); +} + +fn main() { + let epoll = Arc::new(Epoll::new()); + let listener = Arc::new(Tcp4Listener::new("0.0.0.0", 2001, 128).unwrap()); + + listener.accept_async(epoll.clone(), |stream| { + match stream { + Ok(stream) => { + do_echo(stream, Vec::with_capacity(4096 * 4)); + Ok(()) + }, + Err(code) => { + println!("failed to accept; code = {}", code); + Err(()) + } + } + }); + println!("start epoll"); + unsafe { + epoll.run(); + } +} diff --git a/lib/kwasm-net/Cargo.toml b/lib/kwasm-net/Cargo.toml new file mode 100644 index 000000000..135a07c76 --- /dev/null +++ b/lib/kwasm-net/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "kwasm-net" +version = "0.1.0" +authors = ["Heyang Zhou "] +edition = "2018" + +[dependencies] +#runtime = "0.3.0-alpha.4" +#runtime-raw = "0.3.0-alpha.3" diff --git a/lib/kwasm-net/src/lib.rs b/lib/kwasm-net/src/lib.rs new file mode 100644 index 000000000..e6f779190 --- /dev/null +++ b/lib/kwasm-net/src/lib.rs @@ -0,0 +1,420 @@ +#![feature(wasi_ext)] + +use std::fs::File; +use std::os::wasi::io::FromRawFd; +use std::io::{Read, Write}; +use std::net::{Ipv4Addr, AddrParseError}; +use std::ops::{Deref, DerefMut}; +use std::task::{Waker, RawWaker, RawWakerVTable, Poll}; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::cell::RefCell; + +const AF_INET: i32 = 2; +const SOCK_STREAM: i32 = 1; +const O_NONBLOCK: u32 = 2048; +const F_GETFL: i32 = 3; +const F_SETFL: i32 = 4; +const EPOLLIN: u32 = 1u32; +const EPOLLOUT: u32 = 4u32; +const EPOLLONESHOT: u32 = 1u32 << 30; +const EPOLLET: u32 = 1u32 << 31; +const EAGAIN: i32 = 11; +const EWOULDBLOCK: i32 = EAGAIN; +const EPOLL_CTL_ADD: i32 = 1; +const EPOLL_CTL_DEL: i32 = 2; + +#[link(wasm_import_module = "net")] +extern "C" { + fn _socket(family: i32, _type: i32, proto: i32) -> i32; + fn _bind(fd: i32, sa: *const SockaddrIn, sa_len: usize) -> i32; + fn _listen(fd: i32, backlog: i32) -> i32; + fn _accept4(fd: i32, sa: *mut SockaddrIn, sa_len: *mut usize, flags: u32) -> i32; + fn _sendto(fd: i32, buf: *const u8, buf_len: usize, flags: u32, addr: *const SockaddrIn, addr_len: usize) -> i32; + fn _recvfrom(fd: i32, buf: *mut u8, buf_len: usize, flags: u32, addr: *mut SockaddrIn, addr_len: *mut usize) -> i32; + fn _get_immediate_fd() -> i32; + fn _epoll_create() -> i32; + fn _epoll_ctl( + epfd: i32, + op: i32, + fd: i32, + event: *const EpollEvent, + ) -> i32; + fn _epoll_wait( + epfd: i32, + events: *mut EpollEvent, + maxevents: usize, + timeout: i32, + ) -> i32; + fn _fcntl( + fd: i32, + cmd: i32, + arg: u32, + ) -> i32; +} + +thread_local! { + static GLOBAL_EPOLL: RefCell>> = RefCell::new(None); +} + +struct AsyncState { + callback: Box, + _epoll: Arc, +} + +pub struct Epoll { + fd: i32, +} + +impl Epoll { + pub fn new() -> Epoll { + let fd = unsafe { + _epoll_create() + }; + assert!(fd >= 0); + Epoll { + fd: fd, + } + } + + pub unsafe fn run(self: Arc) -> ! { + GLOBAL_EPOLL.with(|x| { + *x.borrow_mut() = Some(self.clone()); + }); + let mut events: Vec = vec! [ EpollEvent::default(); 16 ]; + loop { + let events_len = events.len(); + let n_ready = unsafe { + _epoll_wait(self.fd, events.as_mut_ptr(), events_len, -1) + }; + assert!(n_ready >= 0); + //println!("n_ready = {}", n_ready); + for ev in &events[..n_ready as usize] { + if ev.events & (EPOLLIN | EPOLLOUT) != 0 { + //println!("Free event {:x} {:?}", ev.events, ev.data as usize as *mut AsyncState); + let state = unsafe { + Box::from_raw(ev.data as usize as *mut AsyncState) + }; + (state.callback)(); + //println!("After callback"); + } else { + println!("unknown event(s): 0x{:x}", ev.events); + } + } + } + } +} + +impl Drop for Epoll { + fn drop(&mut self) { + unsafe { + File::from_raw_fd(self.fd as _); + } + } +} + +#[derive(Copy, Clone, Debug)] +pub enum EpollDirection { + In, + Out +} + +pub fn schedule(f: F) { + let epoll = GLOBAL_EPOLL.with(|x| x.borrow().as_ref().unwrap().clone()); + let epfd = epoll.fd; + let imm_fd = unsafe { _get_immediate_fd() }; + assert!(imm_fd >= 0); + + let state = Box::new(AsyncState { + callback: Box::new(move || { + assert!(unsafe { + _epoll_ctl(epfd, EPOLL_CTL_DEL, imm_fd, ::std::ptr::null()) + } >= 0); + unsafe { File::from_raw_fd(imm_fd as _) }; + f(); + }), + _epoll: epoll, + }); + let ev = EpollEvent { + events: EPOLLIN | EPOLLET | EPOLLONESHOT, + data: Box::into_raw(state) as usize as _, + }; + let ret = unsafe { _epoll_ctl(epfd, EPOLL_CTL_ADD, imm_fd, &ev) }; + assert!(ret >= 0); +} + +fn get_async_io_payload Result + 'static, F: FnOnce(Result) + 'static>( + epoll: Arc, + fd: i32, + direction: EpollDirection, + poll_action: P, + on_ready: F, +) -> Box { + __get_async_io_payload(epoll, fd, direction, poll_action, on_ready, false) +} + +fn __get_async_io_payload Result + 'static, F: FnOnce(Result) + 'static>( + epoll: Arc, + fd: i32, + direction: EpollDirection, + mut poll_action: P, + on_ready: F, + registered: bool, +) -> Box { + let epfd = epoll.fd; + Box::new(move || { + //println!("async io payload"); + let ret = poll_action(fd); + //println!("async io payload (after poll_action)"); + match ret { + Err(x) if x == -EAGAIN || x == -EWOULDBLOCK => { + let state = Box::new(AsyncState { + callback: __get_async_io_payload(epoll.clone(), fd, direction, poll_action, on_ready, true), + _epoll: epoll, + }); + let direction_flag = match direction { + EpollDirection::In => EPOLLIN, + EpollDirection::Out => EPOLLOUT, + }; + let ev = EpollEvent { + events: direction_flag | EPOLLET | EPOLLONESHOT, + data: Box::into_raw(state) as usize as _, + }; + //println!("Alloc event {:?}", ev.data as usize as *mut AsyncState); + let ret = unsafe { _epoll_ctl( + epfd, + EPOLL_CTL_ADD, + fd, + &ev + ) }; + assert!(ret >= 0); + }, + x => { + if registered { + assert!(unsafe { _epoll_ctl( + epfd, + EPOLL_CTL_DEL, + fd, + ::std::ptr::null(), + ) } >= 0); + } + on_ready(x); // fast path + } + } + }) +} + +#[repr(C)] +#[derive(Copy, Clone)] +struct SockaddrIn { + sin_family: u16, // e.g. AF_INET + sin_port: u16, // e.g. htons(3490) + sin_addr: InAddr, + sin_zero: [u8; 8], +} + +#[repr(C)] +#[derive(Copy, Clone)] +struct InAddr { + s_addr: u32, +} + +#[repr(C)] +#[derive(Copy, Clone, Default)] +struct EpollEvent { + events: u32, + data: u64, +} + +fn invert_byteorder_u16(x: u16) -> u16 { + unsafe { + use std::mem::transmute; + let buf: [u8; 2] = transmute(x); + let out: [u8; 2] = [buf[1], buf[0]]; + transmute(out) + } +} + +#[derive(Debug)] +pub enum SocketError { + AddrParse(AddrParseError), + SocketCreate, + Bind, + Listen, + Accept, + Message(String), +} + +pub struct Tcp4Listener { + addr: Ipv4Addr, + port: u16, + fd: i32, +} + +impl Tcp4Listener { + pub fn new>( + addr: A, + port: u16, + backlog: u32, + ) -> Result { + let addr: Ipv4Addr = addr.as_ref().parse().map_err(SocketError::AddrParse)?; + let sa = SockaddrIn { + sin_family: AF_INET as _, + sin_port: invert_byteorder_u16(port), + sin_addr: InAddr { s_addr: unsafe { ::std::mem::transmute(addr.octets()) } }, + sin_zero: [0; 8], + }; + let fd = unsafe { + _socket(AF_INET, SOCK_STREAM, 0) + }; + if fd < 0 { + return Err(SocketError::SocketCreate); + } + if(unsafe { + _bind(fd, &sa, ::std::mem::size_of::()) + } < 0) { + return Err(SocketError::Bind); + } + if(unsafe { + _listen(fd, backlog as _) + } < 0) { + return Err(SocketError::Listen); + } + + unsafe { + let mut socket_flags = _fcntl(fd, F_GETFL, 0) as u32; + socket_flags |= O_NONBLOCK; + assert!(_fcntl(fd, F_SETFL, socket_flags) >= 0); + } + + Ok(Tcp4Listener { + addr: addr, + port: port, + fd: fd, + }) + } + + pub fn accept_async, i32>) -> Result<(), ()> + 'static>(self: Arc, ep: Arc, cb: F) { + let ep2 = ep.clone(); + (get_async_io_payload(ep.clone(), self.fd, EpollDirection::In, move |fd| -> Result, i32> { + let mut incoming_sa: SockaddrIn = unsafe { ::std::mem::uninitialized() }; + let mut real_len: usize = ::std::mem::size_of::(); + let conn = unsafe { + _accept4(fd, &mut incoming_sa, &mut real_len, O_NONBLOCK) + }; + if conn >= 0 { + unsafe { + let mut socket_flags = _fcntl(conn, F_GETFL, 0) as u32; + socket_flags |= O_NONBLOCK; + assert!(_fcntl(conn, F_SETFL, socket_flags) >= 0); + } + Ok(Arc::new(TcpStream { + fd: conn, + epoll: ep.clone(), + })) + } else { + Err(conn) + } + }, move |x| { + if let Ok(()) = cb(x) { + self.accept_async(ep2, cb); + } + }))(); + } +} + +pub struct TcpStream { + fd: i32, + epoll: Arc, +} + +impl TcpStream { + pub fn __write_async(self: Arc, data: Vec, offset: usize, cb: impl FnOnce(Result<(usize, Vec), i32>) + 'static) { + let mut data = Some(data); + + (get_async_io_payload( + self.epoll.clone(), + self.fd, + EpollDirection::Out, + move |fd| -> Result<(usize, Vec), i32> { + let _data = data.as_ref().unwrap(); + let _data = &_data[offset..]; + let ret = unsafe { + _sendto(fd, _data.as_ptr(), _data.len(), 0, ::std::ptr::null(), 0) + }; + if ret >= 0 { + Ok((ret as usize, data.take().unwrap())) + } else { + Err(ret) + } + }, + move |x| { + drop(self); + cb(x); + } + ))(); + } + + pub fn write_async(self: Arc, data: Vec, cb: impl FnOnce(Result<(usize, Vec), i32>) + 'static) { + self.__write_async(data, 0, cb) + } + + pub fn write_all_async(self: Arc, data: Vec, cb: impl FnOnce(Result, i32>) + 'static) { + fn inner(me: Arc, data: Vec, offset: usize, cb: impl FnOnce(Result, i32>) + 'static) { + let me2 = me.clone(); + me.__write_async(data, offset, move |result| { + match result { + Ok((len, data)) => { + let new_offset = offset + len; + if new_offset == data.len() { + cb(Ok(data)); + } else { + inner(me2, data, new_offset, cb); + } + } + Err(code) => { + cb(Err(code)); + } + } + }) + } + inner(self, data, 0, cb); + } + + pub fn read_async(self: Arc, out: Vec, cb: impl FnOnce(Result, i32>) + 'static) { + let mut out = Some(out); + (get_async_io_payload( + self.epoll.clone(), + self.fd, + EpollDirection::In, + move |fd| -> Result, i32> { + let _out = out.as_mut().unwrap(); + let out_cap = _out.capacity(); + let ret = unsafe { + _recvfrom(fd, _out.as_mut_ptr(), out_cap, 0, ::std::ptr::null_mut(), ::std::ptr::null_mut()) + }; + if ret >= 0 { + assert!(ret as usize <= out_cap); + unsafe { + _out.set_len(ret as usize); + } + Ok(out.take().unwrap()) + } else { + Err(ret) + } + }, + move |x| { + drop(self); + cb(x); + } + ))(); + } +} + +impl Drop for TcpStream { + fn drop(&mut self) { + unsafe { + File::from_raw_fd(self.fd as _); + } + } +}