diff --git a/Cargo.lock b/Cargo.lock index ff91f8b..3847de8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto-const-array" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f7df18977a1ee03650ee4b31b4aefed6d56bac188760b6e37610400fe8d4bb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.10", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -89,6 +100,12 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.4.0" @@ -236,7 +253,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.8.0", "scopeguard", ] @@ -305,6 +322,8 @@ dependencies = [ "base64", "criterion", "hyper", + "monoio", + "monoio-compat", "pin-project", "rand", "rustls-pemfile", @@ -318,6 +337,19 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -339,6 +371,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + [[package]] name = "futures-task" version = "0.3.27" @@ -357,6 +395,15 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -374,8 +421,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -470,7 +519,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -498,6 +547,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "is-terminal" version = "0.4.7" @@ -577,6 +636,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.8.0" @@ -598,6 +666,72 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "monoio" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91e8e1600b215e4878b5c236e211f3365415407a2b3891c62dbcfd22f778860" +dependencies = [ + "auto-const-array", + "bytes", + "flume", + "fxhash", + "io-uring", + "libc", + "mio", + "monoio-macros", + "nix", + "pin-project-lite", + "socket2 0.5.1", + "threadpool", + "tokio", + "windows-sys 0.48.0", +] + +[[package]] +name = "monoio-compat" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc34bde8c8f3589ce36edbd392ad195450a1817773f10cc8997e2faf25cd2a3" +dependencies = [ + "monoio", + "reusable-box-future", + "tokio", +] + +[[package]] +name = "monoio-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "176a5f5e69613d9e88337cf2a65e11135332b4efbcc628404a7c555e4452084c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.10", +] + +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset 0.7.1", + "pin-utils", +] + [[package]] name = "num-traits" version = "0.2.15" @@ -818,6 +952,12 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +[[package]] +name = "reusable-box-future" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0e61cd21fbddd85fbd9367b775660a01d388c08a61c6d2824af480b0309bb9" + [[package]] name = "ring" version = "0.16.20" @@ -827,7 +967,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -982,12 +1122,31 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc8d618c6641ae355025c449427f9e96b98abf99a772be3cef6708d15c77147a" +dependencies = [ + "libc", + "windows-sys 0.45.0", +] + [[package]] name = "spin" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "syn" version = "1.0.109" @@ -1045,6 +1204,15 @@ dependencies = [ "syn 2.0.10", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -1070,7 +1238,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.9", "tokio-macros", "windows-sys 0.45.0", ] diff --git a/Cargo.toml b/Cargo.toml index dae33f7..db7afd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,8 @@ trybuild = "1.0.80" criterion = "0.4.0" anyhow = "1.0.71" webpki-roots = "0.23.0" +monoio = { version = "*", default-features = false, features = ["async-cancel", "sync", "bytes", "legacy", "macros", "utils", "tokio-compat"] } +monoio-compat = { version = "0.1.2" } [[bench]] name = "unmask" diff --git a/examples/monoio_server.rs b/examples/monoio_server.rs new file mode 100644 index 0000000..0ecca18 --- /dev/null +++ b/examples/monoio_server.rs @@ -0,0 +1,143 @@ +// Copyright 2023 Divy Srivastava +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use fastwebsockets::upgrade; +use fastwebsockets::OpCode; +use fastwebsockets::WebSocketError; +use hyper::server::conn::Http; +use hyper::service::service_fn; +use hyper::Body; +use hyper::Request; +use hyper::Response; +use monoio_compat::AsyncReadExt; +use monoio_compat::AsyncWriteExt; +use monoio_compat::TcpStreamCompat; +use tokio::net::TcpListener; + +async fn handle_client(fut: upgrade::UpgradeFut) -> Result<(), WebSocketError> { + let mut ws = fastwebsockets::FragmentCollector::new(fut.await?); + + loop { + let frame = ws.read_frame().await?; + match frame.opcode { + OpCode::Close => break, + OpCode::Text | OpCode::Binary => { + ws.write_frame(frame).await?; + } + _ => {} + } + } + + Ok(()) +} +async fn server_upgrade( + mut req: Request, +) -> Result, WebSocketError> { + let (response, fut) = upgrade::upgrade(&mut req)?; + + monoio::spawn(async move { + if let Err(e) = handle_client(fut).await { + eprintln!("Error in websocket connection: {}", e); + } + }); + + Ok(response) +} + +#[monoio::main] +async fn main() -> Result<(), WebSocketError> { + let listener = monoio::net::TcpListener::bind("127.0.0.1:8080").unwrap(); + + println!("Server started, listening on {}", "127.0.0.1:8080"); + loop { + let (stream, _) = listener.accept().await?; + + let hyper_conn = HyperConnection(stream); + println!("Client connected"); + monoio::spawn(async move { + let conn_fut = Http::new() + .with_executor(HyperExecutor) + .serve_connection(hyper_conn, service_fn(server_upgrade)) + .with_upgrades(); + if let Err(e) = conn_fut.await { + println!("An error occurred: {:?}", e); + } + }); + } +} + +use std::pin::Pin; +struct HyperConnection(monoio::net::TcpStream); + +impl tokio::io::AsyncRead for HyperConnection { + #[inline] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for HyperConnection { + #[inline] + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + #[inline] + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + #[inline] + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} + +impl hyper::client::connect::Connection for HyperConnection { + #[inline] + fn connected(&self) -> hyper::client::connect::Connected { + hyper::client::connect::Connected::new() + } +} + +#[allow(clippy::non_send_fields_in_send_ty)] +unsafe impl Send for HyperConnection {} + +use std::future::Future; +#[derive(Clone)] +struct HyperExecutor; + +impl hyper::rt::Executor for HyperExecutor +where + F: Future + 'static, + F::Output: 'static, +{ + fn execute(&self, fut: F) { + monoio::spawn(fut); + } +}