Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions crates/test-util/src/wast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,13 @@ fn component_test_config(test: &Path) -> TestConfig {

if let Some(parent) = test.parent() {
if parent.ends_with("async")
|| ["trap-in-post-return.wast"]
.into_iter()
.any(|name| Some(name) == test.file_name().and_then(|s| s.to_str()))
|| [
"trap-in-post-return.wast",
"resources.wast",
"multiple-resources.wast",
]
.into_iter()
.any(|name| Some(name) == test.file_name().and_then(|s| s.to_str()))
{
ret.component_model_async = Some(true);
ret.component_model_async_stackful = Some(true);
Expand Down
17 changes: 17 additions & 0 deletions crates/wasmtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3009,6 +3009,23 @@ impl Config {
self.shared_memory = enable;
self
}

#[cfg(feature = "component-model")]
#[inline]
pub(crate) fn cm_concurrency_enabled(&self) -> bool {
cfg!(feature = "component-model-async")
&& (self.enabled_features.contains(WasmFeatures::CM_ASYNC)
|| self
.enabled_features
.contains(WasmFeatures::CM_ASYNC_BUILTINS)
|| self
.enabled_features
.contains(WasmFeatures::CM_ASYNC_STACKFUL)
|| self.enabled_features.contains(WasmFeatures::CM_THREADING)
|| self
.enabled_features
.contains(WasmFeatures::CM_ERROR_CONTEXT))
}
}

impl Default for Config {
Expand Down
18 changes: 14 additions & 4 deletions crates/wasmtime/src/runtime/component/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::component::{
HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError, RuntimeInstance,
};
use crate::fiber::{self, StoreFiber, StoreFiberYield};
use crate::prelude::*;
use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
use crate::vm::component::{CallContext, ComponentInstance, HandleTable, ResourceTables};
use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
Expand Down Expand Up @@ -891,6 +892,10 @@ impl<T> Store<T> {
where
T: Send + 'static,
{
ensure!(
self.as_context().0.cm_concurrency_enabled(),
"cannot use `run_concurrent` without enabling component-model async"
);
self.as_context_mut().run_concurrent(fun).await
}

Expand Down Expand Up @@ -987,8 +992,6 @@ impl<T> StoreContextMut<'_, T> {
///
/// # Store-blocking behavior
///
///
///
/// At this time there are certain situations in which the `Future` returned
/// by the `AsyncFnOnce` passed to this function will not be polled for an
/// extended period of time, despite one or more `Waker::wake` events having
Expand Down Expand Up @@ -1059,6 +1062,10 @@ impl<T> StoreContextMut<'_, T> {
where
T: Send + 'static,
{
ensure!(
self.0.cm_concurrency_enabled(),
"cannot use `run_concurrent` without enabling component-model async"
);
self.do_run_concurrent(fun, false).await
}

Expand All @@ -1080,6 +1087,7 @@ impl<T> StoreContextMut<'_, T> {
where
T: Send + 'static,
{
debug_assert!(self.0.cm_concurrency_enabled());
check_recursive_run();
let token = StoreToken::new(self.as_context_mut());

Expand Down Expand Up @@ -1390,7 +1398,8 @@ impl StoreOpaque {
/// - The top-level instance is not already on the current task's call stack.
/// - The instance is not in need of a post-return function call.
/// - `self` has not been poisoned due to a trap.
pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
pub(crate) fn may_enter_concurrent(&mut self, instance: RuntimeInstance) -> bool {
debug_assert!(self.cm_concurrency_enabled());
let state = self.concurrent_state_mut();
if let Some(caller) = state.guest_thread {
instance != state.get_mut(caller.task).unwrap().instance
Expand Down Expand Up @@ -1512,7 +1521,8 @@ impl StoreOpaque {
.set_task_may_block(may_block)
}

pub(crate) fn check_blocking(&mut self) -> Result<()> {
pub(crate) fn check_blocking_concurrent(&mut self) -> Result<()> {
debug_assert!(self.cm_concurrency_enabled());
let state = self.concurrent_state_mut();
let task = state.guest_thread.unwrap().task;
let instance = state.get_mut(task).unwrap().instance.instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::component::{
use crate::store::{StoreOpaque, StoreToken};
use crate::vm::component::{ComponentInstance, HandleTable, TransmitLocalState};
use crate::vm::{AlwaysMut, VMStore};
use crate::{AsContextMut, StoreContextMut, ValRaw};
use crate::{AsContext, AsContextMut, StoreContextMut, ValRaw};
use crate::{
Error, Result, bail,
error::{Context as _, format_err},
Expand Down Expand Up @@ -1123,6 +1123,8 @@ impl<T> FutureReader<T> {
where
T: func::Lower + func::Lift + Send + Sync + 'static,
{
assert!(store.as_context().0.cm_concurrency_enabled());

struct Producer<P>(P);

impl<D, T: func::Lower + 'static, P: FutureProducer<D, Item = T>> StreamProducer<D>
Expand Down Expand Up @@ -1450,6 +1452,11 @@ where
{
/// Create a new `GuardedFutureReader` with the specified `accessor` and `reader`.
pub fn new(accessor: A, reader: FutureReader<T>) -> Self {
assert!(
accessor
.as_accessor()
.with(|a| a.as_context().0.cm_concurrency_enabled())
);
Self {
reader: Some(reader),
accessor,
Expand Down Expand Up @@ -1503,6 +1510,7 @@ impl<T> StreamReader<T> {
where
T: func::Lower + func::Lift + Send + Sync + 'static,
{
assert!(store.as_context().0.cm_concurrency_enabled());
Self::new_(
store
.as_context_mut()
Expand Down Expand Up @@ -1778,6 +1786,11 @@ where
/// Create a new `GuardedStreamReader` with the specified `accessor` and
/// `reader`.
pub fn new(accessor: A, reader: StreamReader<T>) -> Self {
assert!(
accessor
.as_accessor()
.with(|a| a.as_context().0.cm_concurrency_enabled())
);
Self {
reader: Some(reader),
accessor,
Expand Down
21 changes: 1 addition & 20 deletions crates/wasmtime/src/runtime/component/concurrent_disabled.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::component::func::{LiftContext, LowerContext};
use crate::component::matching::InstanceType;
use crate::component::{ComponentType, Lift, Lower, RuntimeInstance, Val};
use crate::store::StoreOpaque;
use crate::component::{ComponentType, Lift, Lower, Val};
use crate::{Result, bail, error::format_err};
use core::convert::Infallible;
use core::mem::MaybeUninit;
Expand Down Expand Up @@ -150,21 +149,3 @@ unsafe impl Lower for StreamAny {
match self.0 {}
}
}

impl StoreOpaque {
pub(crate) fn check_blocking(&mut self) -> Result<()> {
Ok(())
}

pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
if self.trapped() {
return false;
}

let flags = self
.component_instance(instance.instance)
.instance_flags(instance.index);

unsafe { !flags.needs_post_return() }
}
}
26 changes: 12 additions & 14 deletions crates/wasmtime/src/runtime/component/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,26 +263,24 @@ impl Func {
let store = store.as_context_mut();

#[cfg(feature = "component-model-async")]
{
store
if store.0.cm_concurrency_enabled() {
return store
.run_concurrent_trap_on_idle(async |store| {
self.call_concurrent_dynamic(store, params, results, false)
.await
.map(drop)
})
.await?
}
#[cfg(not(feature = "component-model-async"))]
{
assert!(
store.0.async_support(),
"cannot use `call_async` without enabling async support in the config"
);
let mut store = store;
store
.on_fiber(|store| self.call_impl(store, params, results))
.await?
.await?;
}

assert!(
store.0.async_support(),
"cannot use `call_async` without enabling async support in the config"
);
let mut store = store;
store
.on_fiber(|store| self.call_impl(store, params, results))
.await?
}

fn check_params_results<T>(
Expand Down
4 changes: 2 additions & 2 deletions crates/wasmtime/src/runtime/component/func/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub struct LiftContext<'a> {
not(feature = "component-model-async"),
allow(unused, reason = "easier to not #[cfg] away")
)]
concurrent_state: &'a mut ConcurrentState,
concurrent_state: Option<&'a mut ConcurrentState>,
}

#[doc(hidden)]
Expand Down Expand Up @@ -408,7 +408,7 @@ impl<'a> LiftContext<'a> {

#[cfg(feature = "component-model-async")]
pub(crate) fn concurrent_state_mut(&mut self) -> &mut ConcurrentState {
self.concurrent_state
self.concurrent_state.as_deref_mut().unwrap()
}

/// Lifts an `own` resource from the guest at the `idx` specified into its
Expand Down
19 changes: 10 additions & 9 deletions crates/wasmtime/src/runtime/component/func/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ where
store.0.async_support(),
"cannot use `call_async` when async support is not enabled on the config"
);

#[cfg(feature = "component-model-async")]
{
if store.0.cm_concurrency_enabled() {
use crate::component::concurrent::TaskId;
use crate::runtime::vm::SendSyncPtr;
use core::ptr::NonNull;
Expand Down Expand Up @@ -236,18 +237,16 @@ where
};

let result = concurrent::queue_call(wrapper.store.as_context_mut(), prepared)?;
wrapper
return wrapper
.store
.as_context_mut()
.run_concurrent_trap_on_idle(async |_| Ok(result.await?.0))
.await?
}
#[cfg(not(feature = "component-model-async"))]
{
store
.on_fiber(|store| self.call_impl(store, params))
.await?
.await?;
}

store
.on_fiber(|store| self.call_impl(store, params))
.await?
}

/// Start a concurrent call to this function.
Expand Down Expand Up @@ -321,6 +320,7 @@ where
{
let result = accessor.as_accessor().with(|mut store| {
let mut store = store.as_context_mut();
assert!(store.0.cm_concurrency_enabled());
assert!(
store.0.async_support(),
"cannot use `call_concurrent` when async support is not enabled on the config"
Expand Down Expand Up @@ -379,6 +379,7 @@ where
Return: 'static,
{
use crate::component::storage::slice_to_storage;
debug_assert!(store.0.cm_concurrency_enabled());

let param_count = if Params::flatten_count() <= MAX_FLAT_PARAMS {
Params::flatten_count()
Expand Down
35 changes: 35 additions & 0 deletions crates/wasmtime/src/runtime/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,38 @@ pub(crate) mod concurrent_disabled;

#[cfg(not(feature = "component-model-async"))]
pub(crate) use concurrent_disabled as concurrent;

impl crate::runtime::store::StoreOpaque {
#[cfg(feature = "component-model-async")]
pub(crate) fn cm_concurrency_enabled(&self) -> bool {
let enabled = self.concurrent_state().is_some();
debug_assert_eq!(enabled, self.engine().config().cm_concurrency_enabled());
enabled
}

pub(crate) fn check_blocking(&mut self) -> crate::Result<()> {
#[cfg(feature = "component-model-async")]
if self.cm_concurrency_enabled() {
return self.check_blocking_concurrent();
}

Ok(())
}

pub(crate) fn may_enter(&mut self, instance: RuntimeInstance) -> bool {
#[cfg(feature = "component-model-async")]
if self.cm_concurrency_enabled() {
return self.may_enter_concurrent(instance);
}

if self.trapped() {
return false;
}

let flags = self
.component_instance(instance.instance)
.instance_flags(instance.index);

unsafe { !flags.needs_post_return() }
}
}
Loading