Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback};
use crate::actor::sleep::{CanSleep, SleepState};
use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts};
use crate::actor::task::LifecycleEvent;
#[cfg(not(target_arch = "wasm32"))]
use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error};
use crate::actor::task::LifecycleEvent;
use crate::actor::task_types::UserTaskKind;
#[cfg(feature = "wasm-runtime")]
use crate::actor::work_registry::CountGuard;
use crate::actor::work_registry::RegionGuard;
use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime};
use crate::inspector::{Inspector, InspectorSnapshot};
Expand Down Expand Up @@ -107,7 +105,7 @@
// Forced-sync: queue config is read from sync public methods before blocking
// on async queue work.
pub(super) queue_config: Mutex<ActorConfig>,
pub(super) queue_abort_signal: Option<CancellationToken>,
pub(super) queue_abort_signal: Mutex<CancellationToken>,
pub(super) queue_initialize: OnceCell<()>,
// Forced-sync: startup installs preload before any queue method awaits init.
pub(super) queue_preloaded_kv: Mutex<Option<PreloadedKv>>,
Expand Down Expand Up @@ -135,7 +133,7 @@
destroy_requested: AtomicBool,
destroy_completed: AtomicBool,
destroy_completion_notify: Notify,
abort_signal: CancellationToken,
abort_signal: Mutex<CancellationToken>,
shutdown_deadline: CancellationToken,
// Forced-sync: runtime wiring slots are configured through synchronous
// lifecycle setup and cloned before sending events.
Expand All @@ -145,7 +143,7 @@
actor_events: RwLock<Option<mpsc::UnboundedSender<ActorEvent>>>,
pub(super) lifecycle_events: RwLock<Option<mpsc::Sender<LifecycleEvent>>>,
hibernated_connection_liveness_override: RwLock<Option<BTreeSet<(Vec<u8>, Vec<u8>)>>>,
pub(super) lifecycle_event_inbox_capacity: usize,

Check warning on line 146 in rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

field `lifecycle_event_inbox_capacity` is never read
pub(super) metrics: ActorMetrics,
diagnostics: ActorDiagnostics,
actor_id: String,
Expand Down Expand Up @@ -280,7 +278,7 @@
#[cfg(test)]
schedule_driver_alarm_cancel_count: AtomicUsize::new(0),
queue_config: Mutex::new(config.clone()),
queue_abort_signal: Some(abort_signal.clone()),
queue_abort_signal: Mutex::new(abort_signal.clone()),
queue_initialize: OnceCell::new(),
queue_preloaded_kv: Mutex::new(None),
queue_preloaded_message_entries: Mutex::new(None),
Expand All @@ -303,7 +301,7 @@
destroy_requested: AtomicBool::new(false),
destroy_completed: AtomicBool::new(false),
destroy_completion_notify: Notify::new(),
abort_signal,
abort_signal: Mutex::new(abort_signal),
shutdown_deadline,
inspector: RwLock::new(None),
inspector_attach_count: RwLock::new(None),
Expand Down Expand Up @@ -481,30 +479,44 @@
self.flush_on_shutdown();
self.0.destroy_requested.store(true, Ordering::SeqCst);
self.0.destroy_completed.store(false, Ordering::SeqCst);
self.0.abort_signal.cancel();
self.0.abort_signal.lock().cancel();
}

#[cfg(feature = "wasm-runtime")]
fn mark_destroy_requested_without_spawn(&self) {
self.cancel_sleep_timer();
self.0.destroy_requested.store(true, Ordering::SeqCst);
self.0.destroy_completed.store(false, Ordering::SeqCst);
self.0.abort_signal.cancel();
self.0.abort_signal.lock().cancel();
}

#[doc(hidden)]
pub fn cancel_abort_signal_for_sleep(&self) {
self.0.abort_signal.cancel();
self.0.abort_signal.lock().cancel();
}

pub(crate) fn reset_abort_signal_for_start(&self) {
let mut abort_signal = self.0.abort_signal.lock();
if !abort_signal.is_cancelled() {
return;
}

// Sleep cancels the generation abort signal to break queue waits and the
// run loop out of blocking calls. A restarted actor needs a fresh signal
// so the next generation can wait normally.
let next_signal = CancellationToken::new();
*abort_signal = next_signal.clone();
*self.0.queue_abort_signal.lock() = next_signal;
}

#[doc(hidden)]
pub fn actor_abort_signal(&self) -> CancellationToken {
self.0.abort_signal.clone()
self.0.abort_signal.lock().clone()
}

#[doc(hidden)]
pub fn actor_aborted(&self) -> bool {
self.0.abort_signal.is_cancelled()
self.0.abort_signal.lock().is_cancelled()
}

/// Fires when the shutdown grace deadline has elapsed and core is forcing
Expand Down Expand Up @@ -562,12 +574,8 @@

#[cfg(feature = "wasm-runtime")]
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
let counter = self.0.sleep.work.shutdown_counter.clone();
counter.increment();
let guard = CountGuard::from_incremented(counter);
let ctx = self.clone();
wasm_bindgen_futures::spawn_local(async move {
let _guard = guard;
self.track_shutdown_task(async move {
ctx.record_user_task_started(UserTaskKind::WaitUntil);
let started_at = Instant::now();
future.await;
Expand Down Expand Up @@ -1306,7 +1314,7 @@
self.reset_sleep_timer();
}

pub(crate) fn sleep_config(&self) -> ActorConfig {

Check warning on line 1317 in rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

method `sleep_config` is never used
self.sleep_state_config()
}

Expand All @@ -1314,6 +1322,10 @@
self.0.sleep_requested.load(Ordering::SeqCst)
}

pub(crate) fn clear_sleep_requested(&self) {
self.0.sleep_requested.store(false, Ordering::SeqCst);
}

fn keep_awake_guard(&self) -> KeepAwakeGuard {
let region = self
.keep_awake_region()
Expand Down Expand Up @@ -1484,15 +1496,15 @@
};

match sender.try_reserve() {
Ok(permit) => {
permit.send(event);
}
#[cfg(target_arch = "wasm32")]
Err(_) => {}
#[cfg(not(target_arch = "wasm32"))]
Err(_) => {
let _ = actor_channel_overloaded_error(
LIFECYCLE_EVENT_INBOX_CHANNEL,
Ok(permit) => {
permit.send(event);
}
#[cfg(target_arch = "wasm32")]
Err(_) => {}
#[cfg(not(target_arch = "wasm32"))]
Err(_) => {
let _ = actor_channel_overloaded_error(
LIFECYCLE_EVENT_INBOX_CHANNEL,
self.0.lifecycle_event_inbox_capacity,
operation,
Some(&self.0.metrics),
Expand Down
14 changes: 3 additions & 11 deletions rivetkit-rust/packages/rivetkit-core/src/actor/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,25 +818,17 @@ impl ActorContext {
timeout: Option<Duration>,
signal: Option<&CancellationToken>,
) -> WaitOutcome {
let actor_abort_signal = self.0.queue_abort_signal.lock().clone();
if signal.is_some_and(CancellationToken::is_cancelled) {
return WaitOutcome::Aborted;
}
if self
.0
.queue_abort_signal
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
if actor_abort_signal.is_cancelled() {
return WaitOutcome::Aborted;
}

let notified = self.0.queue_notify.notified();
let actor_aborted = async {
if let Some(signal) = &self.0.queue_abort_signal {
signal.cancelled().await;
} else {
pending::<()>().await;
}
actor_abort_signal.cancelled().await;
};
let external_aborted = async {
if let Some(signal) = signal {
Expand Down
Loading
Loading