Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions nova_cli/src/lib/child_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
collections::VecDeque,
sync::{atomic::AtomicBool, mpsc},
thread,
time::Duration,
time::{Duration, Instant},
};

use nova_vm::ecmascript::{HostHooks, Job};
Expand All @@ -19,7 +19,7 @@ use crate::{ChildToHostMessage, HostToChildMessage};

pub struct CliChildHooks {
promise_job_queue: RefCell<VecDeque<Job>>,
macrotask_queue: RefCell<Vec<Job>>,
macrotask_queue: RefCell<Vec<(Option<Instant>, Job)>>,
pub(crate) receiver: mpsc::Receiver<HostToChildMessage>,
pub(crate) host_sender: mpsc::SyncSender<ChildToHostMessage>,
ready_to_leave: AtomicBool,
Expand Down Expand Up @@ -78,9 +78,11 @@ impl CliChildHooks {
let mut counter = 0u8;
while !off_thread_job_queue.is_empty() {
counter = counter.wrapping_add(1);
for (i, job) in off_thread_job_queue.iter().enumerate() {
if job.is_finished() {
let job = off_thread_job_queue.swap_remove(i);
let now = Instant::now();
for (i, (deadline, job)) in off_thread_job_queue.iter().enumerate() {
let deadline_reached = deadline.is_none_or(|d| now >= d);
if deadline_reached && job.is_finished() {
let (_, job) = off_thread_job_queue.swap_remove(i);
return Some(job);
}
}
Expand All @@ -96,14 +98,19 @@ impl CliChildHooks {

impl HostHooks for CliChildHooks {
fn enqueue_generic_job(&self, job: Job) {
self.macrotask_queue.borrow_mut().push(job);
self.macrotask_queue.borrow_mut().push((None, job));
}

fn enqueue_promise_job(&self, job: Job) {
self.promise_job_queue.borrow_mut().push_back(job);
}

fn enqueue_timeout_job(&self, _timeout_job: Job, _milliseconds: u64) {}
fn enqueue_timeout_job(&self, timeout_job: Job, milliseconds: u64) {
let deadline = Instant::now() + Duration::from_millis(milliseconds);
self.macrotask_queue
.borrow_mut()
.push((Some(deadline), timeout_job));
}

fn get_host_data(&self) -> &dyn std::any::Any {
self
Expand Down
29 changes: 21 additions & 8 deletions nova_cli/src/lib/host_hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
//! The [`HostHooks`] implementation for the main thread.

use std::{
cell::RefCell, collections::VecDeque, fmt::Debug, path::PathBuf, rc::Rc, sync::mpsc, thread,
time::Duration,
cell::RefCell,
collections::VecDeque,
fmt::Debug,
path::PathBuf,
rc::Rc,
sync::mpsc,
thread,
time::{Duration, Instant},
};

use nova_vm::{
Expand All @@ -29,7 +35,7 @@ pub enum ChildToHostMessage {

pub struct CliHostHooks {
promise_job_queue: RefCell<VecDeque<Job>>,
macrotask_queue: RefCell<Vec<Job>>,
macrotask_queue: RefCell<Vec<(Option<Instant>, Job)>>,
pub(crate) receiver: mpsc::Receiver<ChildToHostMessage>,
pub(crate) own_sender: mpsc::SyncSender<ChildToHostMessage>,
pub(crate) child_senders: RefCell<Vec<mpsc::SyncSender<HostToChildMessage>>>,
Expand Down Expand Up @@ -83,9 +89,11 @@ impl CliHostHooks {
let mut counter = 0u8;
while !off_thread_job_queue.is_empty() {
counter = counter.wrapping_add(1);
for (i, job) in off_thread_job_queue.iter().enumerate() {
if job.is_finished() {
let job = off_thread_job_queue.swap_remove(i);
let now = Instant::now();
for (i, (deadline, job)) in off_thread_job_queue.iter().enumerate() {
let deadline_reached = deadline.is_none_or(|d| now >= d);
if deadline_reached && job.is_finished() {
let (_, job) = off_thread_job_queue.swap_remove(i);
return Some(job);
}
}
Expand All @@ -101,14 +109,19 @@ impl CliHostHooks {

impl HostHooks for CliHostHooks {
fn enqueue_generic_job(&self, job: Job) {
self.macrotask_queue.borrow_mut().push(job);
self.macrotask_queue.borrow_mut().push((None, job));
}

fn enqueue_promise_job(&self, job: Job) {
self.promise_job_queue.borrow_mut().push_back(job);
}

fn enqueue_timeout_job(&self, _timeout_job: Job, _milliseconds: u64) {}
fn enqueue_timeout_job(&self, timeout_job: Job, milliseconds: u64) {
let deadline = Instant::now() + Duration::from_millis(milliseconds);
self.macrotask_queue
.borrow_mut()
.push((Some(deadline), timeout_job));
}

fn load_imported_module<'gc>(
&self,
Expand Down
151 changes: 100 additions & 51 deletions nova_vm/src/ecmascript/builtins/structured_data/atomics_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::{
hint::assert_unchecked,
ops::ControlFlow,
sync::Arc,
thread::{self, JoinHandle},
time::Duration,
};
use std::{hint::assert_unchecked, ops::ControlFlow, sync::Arc, time::Duration};

use ecmascript_atomics::Ordering;

Expand Down Expand Up @@ -1621,52 +1615,106 @@ fn create_wait_result_object<'gc>(
.expect("Should perform GC here")
}

#[derive(Debug)]
struct WaitAsyncJobInner {
data_block: SharedDataBlock,
byte_index_in_buffer: usize,
waiter_record: Arc<WaiterRecord>,
promise_to_resolve: Global<Promise<'static>>,
join_handle: JoinHandle<WaitResult>,
_has_timeout: bool,
}

#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct WaitAsyncJob(Box<WaitAsyncJobInner>);

impl WaitAsyncJob {
pub(crate) fn is_finished(&self) -> bool {
self.0.join_handle.is_finished()
self.0.waiter_record.is_notified()
}

pub(crate) fn _will_halt(&self) -> bool {
self.0._has_timeout
}

// NOTE: The reason for using `GcScope` here even though we could've gotten
// away with `NoGcScope` is that this is essentially a trait impl method,
// but currently without the trait. The job trait will be added eventually
// and we can get rid of this lint exception.
#[allow(unknown_lints, can_use_no_gc_scope)]
pub(crate) fn run<'gc>(self, agent: &mut Agent, gc: GcScope) -> JsResult<'gc, ()> {
pub(crate) fn run<'gc>(self, agent: &mut Agent, gc: GcScope<'gc, '_>) -> JsResult<'gc, ()> {
let gc = gc.into_nogc();

// SAFETY: buffer is a cloned SharedDataBlock; non-dangling.
let waiters = unsafe { self.0.data_block.get_or_init_waiters() };
// a. Perform EnterCriticalSection(WL).
let mut guard = waiters.lock().unwrap();
let waiter_record = self.0.waiter_record;
guard.remove_from_list(self.0.byte_index_in_buffer, waiter_record.clone());

let result = match waiter_record.get_result() {
Some(WaitResult::TimedOut) => WaitResult::TimedOut,
Some(WaitResult::Ok) => WaitResult::Ok,
None => {
waiter_record.set_result(WaitResult::Ok);
WaitResult::Ok
}
};

let promise = self.0.promise_to_resolve.take(agent).bind(gc);
let Ok(result) = self.0.join_handle.join() else {
// Foreign thread died; we can never resolve.
let promise_capability = PromiseCapability::from_promise(promise, true);
match result {
WaitResult::Ok => {
unwrap_try(promise_capability.try_resolve(
agent,
BUILTIN_STRING_MEMORY.ok.into(),
gc,
));
}
WaitResult::TimedOut => {
unwrap_try(promise_capability.try_resolve(
agent,
BUILTIN_STRING_MEMORY.timed_out.into(),
gc,
));
}
}
// c. Perform LeaveCriticalSection(WL).
drop(guard);

// d. Return unused.
Ok(())
}
}

struct WaitAsyncTimeoutJobInner {
data_block: SharedDataBlock,
byte_index_in_buffer: usize,
waiter_record: Arc<WaiterRecord>,
}

pub(crate) struct WaitAsyncTimeoutJob(Box<WaitAsyncTimeoutJobInner>);

impl WaitAsyncTimeoutJob {
pub(crate) fn run<'gc>(self, _agent: &mut Agent, _gc: GcScope<'gc, '_>) -> JsResult<'gc, ()> {
if self.0.waiter_record.get_result().is_some() {
return Ok(());
};
}

// SAFETY: buffer is a cloned SharedDataBlock; non-dangling.
let waiters = unsafe { self.0.data_block.get_or_init_waiters() };
// a. Perform EnterCriticalSection(WL).
let mut guard = waiters.lock().unwrap();

// b. If WL.[[Waiters]] contains waiterRecord, then
// i. Let timeOfJobExecution be the time value (UTC) identifying the current time.
// ii. Assert: ℝ(timeOfJobExecution) ≥ waiterRecord.[[TimeoutTime]] (ignoring potential non-monotonicity of time values).
// iii. Set waiterRecord.[[Result]] to "timed-out".
self.0.waiter_record.set_result(WaitResult::TimedOut);

// iv. Perform RemoveWaiter(WL, waiterRecord).
let waiter_record = self.0.waiter_record.clone();
guard.remove_from_list(self.0.byte_index_in_buffer, self.0.waiter_record);

// v. Perform NotifyWaiter(WL, waiterRecord).
waiter_record.notify_waiters();

// c. Perform LeaveCriticalSection(WL).
let promise_capability = PromiseCapability::from_promise(promise, true);
let result = match result {
WaitResult::Ok => BUILTIN_STRING_MEMORY.ok.into(),
WaitResult::TimedOut => BUILTIN_STRING_MEMORY.timed_out.into(),
};
unwrap_try(promise_capability.try_resolve(agent, result, gc));
drop(guard);

// d. Return unused.
Ok(())
}
Expand All @@ -1689,40 +1737,41 @@ fn enqueue_atomics_wait_async_job<const IS_I64: bool>(
// 1. Let timeoutJob be a new Job Abstract Closure with no parameters that
// captures WL and waiterRecord and performs the following steps when
// called:
let handle = thread::spawn(move || {
// SAFETY: buffer is a cloned SharedDataBlock; non-dangling.
let waiters = unsafe { data_block.get_or_init_waiters() };
let mut guard = waiters.lock().unwrap();

if t == u64::MAX {
waiter_record.wait(guard);
} else {
let dur = Duration::from_millis(t);
let (new_guard, timeout) = waiter_record.wait_timeout(guard, dur);
guard = new_guard;
if timeout.timed_out() {
guard.remove_from_list(byte_index_in_buffer, waiter_record);
// 2. Let now be the time value (UTC) identifying the current time.
// 3. Let currentRealm be the current Realm Record.
// 4. Perform HostEnqueueTimeoutJob(timeoutJob, currentRealm, 𝔽(waiterRecord.[[TimeoutTime]]) - now).

// 31. Perform LeaveCriticalSection(WL).
drop(guard);
let timeout_job_data = if t != u64::MAX {
Some(WaitAsyncTimeoutJobInner {
data_block: data_block.clone(),
byte_index_in_buffer,
waiter_record: waiter_record.clone(),
})
} else {
None
};

// 32. If mode is sync, return waiterRecord.[[Result]].
return WaitResult::TimedOut;
}
}
WaitResult::Ok
});
let wait_async_job = Job {
realm: Some(Global::new(agent, agent.current_realm(gc).unbind())),
inner: InnerJob::WaitAsync(WaitAsyncJob(Box::new(WaitAsyncJobInner {
data_block,
byte_index_in_buffer,
waiter_record,
promise_to_resolve: promise,
join_handle: handle,
_has_timeout: t != u64::MAX,
}))),
};
// 2. Let now be the time value (UTC) identifying the current time.
// 3. Let currentRealm be the current Realm Record.
// 4. Perform HostEnqueueTimeoutJob(timeoutJob, currentRealm, 𝔽(waiterRecord.[[TimeoutTime]]) - now).
agent.host_hooks.enqueue_generic_job(wait_async_job);

if let Some(inner) = timeout_job_data {
let wait_async_timeout_job = Job {
realm: Some(Global::new(agent, agent.current_realm(gc).unbind())),
inner: InnerJob::WaitAsyncTimeout(WaitAsyncTimeoutJob(Box::new(inner))),
};
agent
.host_hooks
.enqueue_timeout_job(wait_async_timeout_job, t);
}

// 5. Return unused.
}
8 changes: 6 additions & 2 deletions nova_vm/src/ecmascript/execution/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use ahash::AHashMap;
use crate::ecmascript::GlobalEnvironment;
#[cfg(feature = "shared-array-buffer")]
use crate::ecmascript::SharedArrayBuffer;
#[cfg(feature = "atomics")]
use crate::ecmascript::WaitAsyncJob;
#[cfg(feature = "weak-refs")]
use crate::ecmascript::{FinalizationRegistryCleanupJob, clear_kept_objects};
#[cfg(feature = "atomics")]
use crate::ecmascript::{WaitAsyncJob, WaitAsyncTimeoutJob};
use crate::{
ecmascript::{
AbstractModuleMethods, Environment, ErrorHeapData, ExecutionContext, Function,
Expand Down Expand Up @@ -258,6 +258,8 @@ pub(crate) enum InnerJob {
PromiseReaction(PromiseReactionJob),
#[cfg(feature = "atomics")]
WaitAsync(WaitAsyncJob),
#[cfg(feature = "atomics")]
WaitAsyncTimeout(WaitAsyncTimeoutJob),
#[cfg(feature = "weak-refs")]
FinalizationRegistry(FinalizationRegistryCleanupJob),
}
Expand Down Expand Up @@ -315,6 +317,8 @@ impl Job {
InnerJob::PromiseReaction(job) => job.run(agent, gc),
#[cfg(feature = "atomics")]
InnerJob::WaitAsync(job) => job.run(agent, gc),
#[cfg(feature = "atomics")]
InnerJob::WaitAsyncTimeout(job) => job.run(agent, gc),
#[cfg(feature = "weak-refs")]
InnerJob::FinalizationRegistry(job) => {
job.run(agent, gc);
Expand Down
Loading
Loading