Skip to content

Enable support for wasm by disabling io primitives #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ jobs:
# if: startsWith(matrix.os, 'ubuntu')
# run: cross build --target x86_64-unknown-illumos

wasm:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: rustup target add wasm32-unknown-unknown
- name: Install wasm-pack
uses: taiki-e/install-action@wasm-pack
- run: wasm-pack test --headless --chrome -Zbuild-std
env:
RUSTFLAGS: "${{ env.RUSTFLAGS }} -Ctarget-feature=+atomics,+bulk-memory,+mutable-globals"

msrv:
runs-on: ubuntu-latest
strategy:
Expand Down
31 changes: 22 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ exclude = ["/.*"]
name = "io"
harness = false

[features]
stdweb = ["instant/stdweb"]
wasm-bindgen = ["instant/wasm-bindgen"]

[dependencies]
concurrent-queue = "1.2.2"
cfg-if = "1.0.0"
concurrent-queue = { version = "1.2.2" }
futures-lite = "1.11.0"
instant = "0.1.12"
log = "0.4.11"
once_cell = "1.4.1"
parking = "2.0.0"
polling = "2.0.0"
slab = "0.4.2"
socket2 = { version = "0.4.2", features = ["all"] }
waker-fn = "1.1.0"

[build-dependencies]
autocfg = "1"
[target.'cfg(not(target_family = "wasm"))'.dependencies]
polling = { version = "2.0.0"}
slab = { version = "0.4.2" }
socket2 = { version = "0.4.2", features = ["all"] }

[target."cfg(unix)".dependencies]
libc = "0.2.77"
Expand All @@ -40,12 +45,17 @@ winapi = { version = "0.3.9", features = ["winsock2"] }

[dev-dependencies]
async-channel = "1"
async-net = "1"
blocking = "1"
criterion = "0.3.6"
criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support"] }
tempfile = "3"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
async-net = "1"
getrandom = "0.2.7"
signal-hook = "0.3"
tempfile = "3"

[target.'cfg(target_family = "wasm")'.dev-dependencies]
wasm-bindgen-test = "0.3"

[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.10", default-features = false }
Expand All @@ -54,3 +64,6 @@ timerfd = "1"

[target.'cfg(windows)'.dev-dependencies]
uds_windows = "1"

[build-dependencies]
autocfg = "1"
44 changes: 33 additions & 11 deletions src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
use std::cell::Cell;
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

#[cfg(not(target_family = "wasm"))]
use once_cell::sync::Lazy;
#[cfg(not(target_family = "wasm"))]
use std::sync::atomic::AtomicUsize;
#[cfg(not(target_family = "wasm"))]
use std::thread;
use std::time::{Duration, Instant};
#[cfg(not(target_family = "wasm"))]
use std::time::Instant;

use futures_lite::pin;
use once_cell::sync::Lazy;
use waker_fn::waker_fn;

use crate::reactor::Reactor;

/// Number of currently active `block_on()` invocations.
#[cfg(not(target_family = "wasm"))]
static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);

/// Unparker for the "async-io" thread.
///
/// The thread is not available on WASM.
#[cfg(not(target_family = "wasm"))]
static UNPARKER: Lazy<parking::Unparker> = Lazy::new(|| {
let (parker, unparker) = parking::pair();

Expand All @@ -34,10 +45,12 @@ static UNPARKER: Lazy<parking::Unparker> = Lazy::new(|| {

/// Initializes the "async-io" thread.
pub(crate) fn init() {
#[cfg(not(target_family = "wasm"))]
Lazy::force(&UNPARKER);
}

/// The main loop for the "async-io" thread.
#[cfg(not(target_family = "wasm"))]
fn main_loop(parker: parking::Parker) {
// The last observed reactor tick.
let mut last_tick = 0;
Expand Down Expand Up @@ -103,14 +116,18 @@ fn main_loop(parker: parking::Parker) {
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
log::trace!("block_on()");

// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
cfg_if::cfg_if! {
if #[cfg(not(target_family = "wasm"))] {
// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);

// Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
let _guard = CallOnDrop(|| {
BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
UNPARKER.unpark();
});
// Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
let _guard = CallOnDrop(|| {
BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
UNPARKER.unpark();
});
}
}

// Parker and unparker for notifying the current thread.
let (p, u) = parking::pair();
Expand All @@ -128,7 +145,10 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
move || {
if u.unpark() {
// Check if waking from another thread and if currently blocked on I/O.
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
// Always wake up on WASM.
if cfg!(target_family = "wasm")
|| (!IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst))
{
Reactor::get().notify();
}
}
Expand Down Expand Up @@ -165,6 +185,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// Try grabbing a lock on the reactor to wait on I/O.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
// Record the instant at which the lock was grabbed.
#[cfg(not(target_family = "wasm"))]
let start = Instant::now();

loop {
Expand Down Expand Up @@ -194,6 +215,7 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
}

// Check if this thread been handling I/O events for a long time.
#[cfg(not(target_family = "wasm"))]
if start.elapsed() > Duration::from_micros(500) {
log::trace!("block_on: stops hogging the reactor");

Expand Down
Loading