Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ tempfile = { workspace = true }
libtest-mimic = { workspace = true }
cranelift-native = { workspace = true }
wasmtime-test-util = { workspace = true }
tokio = { workspace = true, features = ["macros", "sync"] }
tokio = { workspace = true, features = ["macros", "sync", "rt-multi-thread"] }

[build-dependencies]
cc = { workspace = true, optional = true }
Expand Down
15 changes: 11 additions & 4 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ impl Instance {
fun: impl AsyncFnOnce(&Accessor<T>) -> R,
) -> Result<R>
where
T: 'static,
T: Send + 'static,
{
check_recursive_run();
let mut store = store.as_context_mut();
Expand Down Expand Up @@ -1165,7 +1165,10 @@ impl Instance {
self,
mut store: StoreContextMut<'_, T>,
mut future: Pin<&mut impl Future<Output = R>>,
) -> Result<R> {
) -> Result<R>
where
T: Send,
{
loop {
// Take `ConcurrentState::futures` out of the instance so we can
// poll it while also safely giving any of the futures inside access
Expand Down Expand Up @@ -1320,7 +1323,7 @@ impl Instance {
}

/// Handle the specified work item, possibly resuming a fiber if applicable.
async fn handle_work_item<T>(
async fn handle_work_item<T: Send>(
self,
store: StoreContextMut<'_, T>,
item: WorkItem,
Expand Down Expand Up @@ -1438,7 +1441,11 @@ impl Instance {
}

/// Execute the specified guest call on a worker fiber.
async fn run_on_worker<T>(self, store: StoreContextMut<'_, T>, item: WorkerItem) -> Result<()> {
async fn run_on_worker<T: Send>(
self,
store: StoreContextMut<'_, T>,
item: WorkerItem,
) -> Result<()> {
let worker = if let Some(fiber) = self.concurrent_state_mut(store.0).worker.take() {
fiber
} else {
Expand Down
4 changes: 2 additions & 2 deletions crates/wasmtime/src/runtime/externals/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Table {
/// [`Store`](`crate::Store`)
#[cfg(feature = "async")]
pub async fn new_async(
mut store: impl AsContextMut<Data: Send>,
mut store: impl AsContextMut,
ty: TableType,
init: Ref,
) -> Result<Table> {
Expand Down Expand Up @@ -350,7 +350,7 @@ impl Table {
#[cfg(feature = "async")]
pub async fn grow_async(
&self,
mut store: impl AsContextMut<Data: Send>,
mut store: impl AsContextMut,
delta: u64,
init: Ref,
) -> Result<u64> {
Expand Down
37 changes: 32 additions & 5 deletions crates/wasmtime/src/runtime/fiber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,13 @@ fn resume_fiber<'a>(
}

/// Create a new `StoreFiber` which runs the specified closure.
pub(crate) fn make_fiber<'a, S>(
///
/// # Safety
///
/// The returned `StoreFiber<'a>` structure is unconditionally `Send` but the
/// send-ness is actually a function of `S`. When `S` is statically known to be
/// `Send` then use the safe [`make_fiber`] function.
pub(crate) unsafe fn make_fiber_unchecked<'a, S>(
store: &mut S,
fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
) -> Result<StoreFiber<'a>>
Expand Down Expand Up @@ -852,6 +858,19 @@ where
})
}

/// Safe wrapper around [`make_fiber_unchecked`] which requires that `S` is
/// `Send`.
#[cfg(feature = "component-model-async")]
pub(crate) fn make_fiber<'a, S>(
store: &mut S,
fun: impl FnOnce(&mut S) -> Result<()> + Send + Sync + 'a,
) -> Result<StoreFiber<'a>>
where
S: AsStoreOpaque + Send + ?Sized + 'a,
{
unsafe { make_fiber_unchecked(store, fun) }
}

/// Run the specified function on a newly-created fiber and `.await` its
/// completion.
pub(crate) async fn on_fiber<S, R>(
Expand All @@ -868,10 +887,18 @@ where
debug_assert!(config.async_stack_size > 0);

let mut result = None;
let fiber = make_fiber(store, |store| {
result = Some(func(store));
Ok(())
})?;

// SAFETY: the `StoreFiber` returned by `make_fiber_unchecked` is `Send`
// despite we not actually knowing here whether `S` is `Send` or not. That
// is safe here, however, because this function is already conditionally
// `Send` based on `S`. Additionally `fiber` doesn't escape this function,
// so the future-of-this-function is still correctly `Send`-vs-not.
let fiber = unsafe {
make_fiber_unchecked(store, |store| {
result = Some(func(store));
Ok(())
})?
};

{
let fiber = FiberFuture {
Expand Down
66 changes: 57 additions & 9 deletions crates/wasmtime/src/runtime/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,63 @@ impl Instance {
///
/// This function will also panic, like [`Instance::new`], if any [`Extern`]
/// specified does not belong to `store`.
///
/// # Examples
///
/// An example of using this function:
///
/// ```
/// use wasmtime::{Result, Store, Engine, Config, Module, Instance};
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let mut config = Config::new();
/// config.async_support(true);
/// let engine = Engine::new(&config)?;
///
/// // For this example, a module with no imports is being used hence
/// // the empty array to `Instance::new_async`.
/// let module = Module::new(&engine, "(module)")?;
/// let mut store = Store::new(&engine, ());
/// let instance = Instance::new_async(&mut store, &module, &[]).await?;
///
/// // ... use `instance` and exports and such ...
///
/// Ok(())
/// }
/// ```
///
/// Note, though, that the future returned from this function is only
/// `Send` if the store's own data is `Send` meaning that this does not
/// compile for example:
///
/// ```compile_fail
/// use wasmtime::{Result, Store, Engine, Config, Module, Instance};
/// use std::rc::Rc;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let mut config = Config::new();
/// config.async_support(true);
/// let engine = Engine::new(&config)?;
///
/// let module = Module::new(&engine, "(module)")?;
///
/// // Note that `Rc<()>` is NOT `Send`, which is what many future
/// // runtimes require and below will cause a failure.
/// let mut store = Store::new(&engine, Rc::new(()));
///
/// // Compile failure because `Store<Rc<()>>` is not `Send`
/// assert_send(Instance::new_async(&mut store, &module, &[])).await?;
///
/// Ok(())
/// }
///
/// fn assert_send<T: Send>(t: T) -> T { t }
/// ```
#[cfg(feature = "async")]
pub async fn new_async(
mut store: impl AsContextMut<Data: Send>,
mut store: impl AsContextMut,
module: &Module,
imports: &[Extern],
) -> Result<Instance> {
Expand Down Expand Up @@ -229,10 +283,7 @@ impl Instance {
store: &mut StoreContextMut<'_, T>,
module: &Module,
imports: Imports<'_>,
) -> Result<Instance>
where
T: Send + 'static,
{
) -> Result<Instance> {
assert!(
store.0.async_support(),
"must use sync instantiation when async support is disabled",
Expand Down Expand Up @@ -854,10 +905,7 @@ impl<T: 'static> InstancePre<T> {
pub async fn instantiate_async(
&self,
mut store: impl AsContextMut<Data = T>,
) -> Result<Instance>
where
T: Send,
{
) -> Result<Instance> {
let mut store = store.as_context_mut();
let imports = pre_instantiate_raw(
&mut store.0,
Expand Down
11 changes: 2 additions & 9 deletions crates/wasmtime/src/runtime/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,7 @@ impl Memory {
/// This function will panic when used with a non-async
/// [`Store`](`crate::Store`).
#[cfg(feature = "async")]
pub async fn new_async(
mut store: impl AsContextMut<Data: Send>,
ty: MemoryType,
) -> Result<Memory> {
pub async fn new_async(mut store: impl AsContextMut, ty: MemoryType) -> Result<Memory> {
let mut store = store.as_context_mut();
assert!(
store.0.async_support(),
Expand Down Expand Up @@ -623,11 +620,7 @@ impl Memory {
/// This function will panic when used with a non-async
/// [`Store`](`crate::Store`).
#[cfg(feature = "async")]
pub async fn grow_async(
&self,
mut store: impl AsContextMut<Data: Send>,
delta: u64,
) -> Result<u64> {
pub async fn grow_async(&self, mut store: impl AsContextMut, delta: u64) -> Result<u64> {
let mut store = store.as_context_mut();
assert!(
store.0.async_support(),
Expand Down
5 changes: 1 addition & 4 deletions crates/wasmtime/src/runtime/store/async_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,7 @@ impl<T> StoreContextMut<'_, T> {
pub(crate) async fn on_fiber<R: Send + Sync>(
&mut self,
func: impl FnOnce(&mut StoreContextMut<'_, T>) -> R + Send + Sync,
) -> Result<R>
where
T: Send + 'static,
{
) -> Result<R> {
fiber::on_fiber(self.0, |me| func(&mut StoreContextMut(me))).await
}
}