Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ When the user asks to track something in a note, store it in `.agent/notes/` by
- Reserve `tokio::time::sleep` for: per-call timeouts via `tokio::select!`, retry/reconnect backoff, deliberate debounce windows, or `sleep_until(deadline)` arms in an event-select loop. If it is inside a `loop { check; sleep }` body, it is polling and should be event-driven instead.
- Never add unexplained wall-clock defers like `sleep(1ms)` to decouple a spawn from its caller. Use `tokio::task::yield_now().await` or rely on the spawn itself.

## Memory Leaks

- Never call `Box::leak` inside a per-request, per-error, or per-call code path. If the leak is for a `'static` reference required by an upstream API (e.g. `RivetErrorSchema`), intern the leaked value through a process-global `LazyLock<scc::HashMap<Key, &'static T>>` keyed on its identity so each unique value is leaked at most once. Examples: `BRIDGE_RIVET_ERROR_SCHEMAS` in `rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs`.
- If every field in a leaked struct is a compile-time constant, use a `static`/`const` instead of `Box::leak(Box::new(...))`.
- `std::mem::forget` is only acceptable when an FFI handle cannot be dropped in the current context (e.g. napi `Ref::unref` requires an `Env`). Document the constraint inline and ensure the leak is bounded per actor/connection lifetime, not per call. Prefer routing the drop through an Env-bearing thread when possible.
- Spawned futures that capture JS callbacks or other heavy resources must have a guaranteed completion path (e.g. a `CancellationToken` whose clones are guaranteed to drop). A `spawn_local(async move { token.cancelled().await; ... })` only drains if every clone of the token is dropped or cancelled.

## Async Rust Locks

- Async Rust code defaults to `tokio::sync::Mutex` / `tokio::sync::RwLock`. Do not use `std::sync::Mutex` / `std::sync::RwLock`.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions docs-internal/engine/napi-bridge.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ Rules for `rivetkit-typescript/packages/rivetkit-napi/`. The bridge is pure plum
- Receive-loop `SerializeState` handling stays inline in `napi_actor_events.rs`, reuses the shared `state_deltas_from_payload(...)` converter from `actor_context.rs`, and only cancels the adapter abort token on `Destroy` or final adapter teardown, not on `Sleep`.
- Receive-loop NAPI optional callbacks preserve the TypeScript runtime defaults: missing `onBeforeSubscribe` allows the subscription, missing workflow callbacks reply `None`, and missing connection lifecycle hooks still accept the connection while leaving the existing empty conn state untouched.

## Runtime-state reference cleanup

- `ActorContextShared::runtime_state` stores a N-API `Ref<()>` for the JS-only actor runtime state bag. `Ref::unref(env)` and reference deletion require an `Env`, but `reset_runtime_state()` runs from receive-loop worker paths and `Drop for ActorContextShared` may run without an active JS callback frame.
- The current `mem::forget` fallback in `actor_context.rs` keeps debug and release behavior aligned when no `Env` is available, but it leaks one JS object reference per actor wake cycle that created runtime state.
- The intended fix is to create an actor-shared cleanup `ThreadsafeFunction` the first time `runtime_state(env)` has an `Env`. Stale `Ref<()>` values should be wrapped in a payload whose `Drop` forgets the reference only if it was never successfully unreffed, then queued to that TSF from `reset_runtime_state()` and `Drop`.
- The TSF callback must run on the JS thread, call `ref.unref(ctx.env)`, and avoid invoking user callbacks. The TSF itself should be unreffed from the event loop so it does not keep Node alive.
- Shutdown is the hard edge: if the TSF is closing or Node supplies a null `Env` during addon teardown, the payload must fall back to the existing bounded process-lifetime leak instead of dropping a live `Ref<()>` and tripping napi-rs debug assertions.
- Before replacing the fallback, add a NAPI integration test that creates runtime state across many actor wake/destroy cycles, waits for the cleanup TSF to drain, and verifies native reference counts return to zero.

## Cancellation bridging

- For non-idempotent native waits like `queue.enqueueAndWait()`, bridge JS `AbortSignal` through a standalone native `CancellationToken`. Timeout-slicing is only safe for receive-style polling calls like `waitForNames()`.
Expand Down
86 changes: 86 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
## Module layout

- Actor subsystem implementations belong under `src/actor/`; keep root module aliases only for compatibility with existing public callers.
- Public HTTP status promotion for bridged runtime errors belongs in `src/error.rs::public_error_status_code`; NAPI and wasm adapters should call core instead of duplicating mappings.

## Sleep state invariants

- Any mutation that changes a `can_sleep` input must call `ActorContext::reset_sleep_timer()` so the `ActorTask` sleep deadline is re-evaluated. Inputs are: `ready`/`started`, `no_sleep`, `active_http_request_count`, `sleep_keep_awake_count`, `sleep_internal_keep_awake_count`, `pending_disconnect_count`, `conns()`, and `websocket_callback_count`. Missing this call leaves the sleep timer armed against stale state and triggers the `"sleep idle deadline elapsed but actor stayed awake"` warning on the next tick.
- `ActorContext::set_prevent_sleep(...)` / `prevent_sleep()` are deprecated no-ops kept for NAPI bridge compatibility. Use `keep_awake(future)` (holds counter while awaited) or `wait_until(future)` (tracked shutdown task) instead. Do not reintroduce a `prevent_sleep` field, a `CanSleep::PreventSleep` variant, or branches that read it.
- Runtime-owned promises that must drain during shutdown should use `ActorContext::register_task(...)`, not public `wait_until(...)`, so metrics and runtime intent stay distinct. Registered tasks must race user work against `shutdown_deadline_token()` so shutdown cannot hang forever.
- `ctx.sleep()` and `ctx.destroy()` return `Result<()>`. They error with `ActorLifecycleError::Starting` when called before startup completes and `ActorLifecycleError::Stopping` if the requested flag has already been set this generation (atomic `swap(true, ...)`). Internal idle-timer paths log and suppress the already-requested error.
- The grace deadline path (`on_sleep_grace_deadline`) aborts the user `run` handle and cancels `shutdown_deadline_token()`. Foreign-runtime adapters running `onSleep` / `onDestroy` must observe that token via `tokio::select!` so SQLite teardown does not race user cleanup work.
- Counter `register_zero_notify(&idle_notify)` hooks only drive shutdown drain waits. They are not a substitute for the activity-dirty notification, so any new sleep-affecting counter must also notify on transitions that change `can_sleep`.
Expand Down
36 changes: 36 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
actor_events: RwLock<Option<mpsc::UnboundedSender<ActorEvent>>>,
pub(super) lifecycle_events: RwLock<Option<mpsc::Sender<LifecycleEvent>>>,
hibernated_connection_liveness_override: RwLock<Option<BTreeSet<(Vec<u8>, Vec<u8>)>>>,
pub(super) lifecycle_event_inbox_capacity: usize,

Check warning on line 148 in rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

field `lifecycle_event_inbox_capacity` is never read
pub(super) metrics: ActorMetrics,
diagnostics: ActorDiagnostics,
actor_id: String,
Expand Down Expand Up @@ -552,6 +552,14 @@
});
}

#[cfg(not(feature = "wasm-runtime"))]
pub fn register_task(&self, future: impl Future<Output = ()> + Send + 'static) {
let ctx = self.clone();
self.track_shutdown_task(async move {
Self::run_registered_task(ctx, future).await;
});
}

#[cfg(feature = "wasm-runtime")]
pub fn wait_until(&self, future: impl Future<Output = ()> + 'static) {
let counter = self.0.sleep.work.shutdown_counter.clone();
Expand All @@ -568,6 +576,34 @@
});
}

#[cfg(feature = "wasm-runtime")]
pub fn register_task(&self, future: impl Future<Output = ()> + 'static) {
let ctx = self.clone();
self.track_shutdown_task(async move {
Self::run_registered_task(ctx, future).await;
});
}

async fn run_registered_task<F>(ctx: ActorContext, future: F)
where
F: Future<Output = ()>,
{
let shutdown_deadline = ctx.shutdown_deadline_token();
ctx.record_user_task_started(UserTaskKind::RegisteredTask);
let started_at = Instant::now();
tokio::select! {
_ = future => {}
_ = shutdown_deadline.cancelled() => {
tracing::warn!(
actor_id = %ctx.actor_id(),
reason = "shutdown_deadline_elapsed",
"registered task cancelled by shutdown deadline"
);
}
}
ctx.record_user_task_finished(UserTaskKind::RegisteredTask, started_at.elapsed());
}

pub async fn keep_awake<F>(&self, future: F) -> F::Output
where
F: Future,
Expand Down Expand Up @@ -1270,7 +1306,7 @@
self.reset_sleep_timer();
}

pub(crate) fn sleep_config(&self) -> ActorConfig {

Check warning on line 1309 in rivetkit-rust/packages/rivetkit-core/src/actor/context.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

method `sleep_config` is never used
self.sleep_state_config()
}

Expand Down
5 changes: 4 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/task_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ pub enum UserTaskKind {
ScheduledAction,
DisconnectCallback,
WaitUntil,
RegisteredTask,
SleepFinalize,
DestroyRequest,
}

impl UserTaskKind {
pub(crate) const ALL: [Self; 10] = [
pub(crate) const ALL: [Self; 11] = [
Self::Action,
Self::Http,
Self::WebSocketLifetime,
Expand All @@ -53,6 +54,7 @@ impl UserTaskKind {
Self::ScheduledAction,
Self::DisconnectCallback,
Self::WaitUntil,
Self::RegisteredTask,
Self::SleepFinalize,
Self::DestroyRequest,
];
Expand All @@ -67,6 +69,7 @@ impl UserTaskKind {
Self::ScheduledAction => "scheduled_action",
Self::DisconnectCallback => "disconnect_callback",
Self::WaitUntil => "wait_until",
Self::RegisteredTask => "registered_task",
Self::SleepFinalize => "sleep_finalize",
Self::DestroyRequest => "destroy_request",
}
Expand Down
23 changes: 23 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
use rivet_error::*;
use serde::{Deserialize, Serialize};

pub fn public_error_status_code(group: &str, code: &str) -> Option<u16> {
match (group, code) {
("auth", "forbidden") => Some(403),
("actor", "action_not_found") => Some(404),
("actor", "action_timed_out") => Some(408),
("actor", "aborted") => Some(400),
("message", "incoming_too_long" | "outgoing_too_long") => Some(400),
(
"queue",
"full"
| "message_too_large"
| "message_invalid"
| "invalid_payload"
| "invalid_completion_payload"
| "already_completed"
| "previous_message_not_completed"
| "complete_not_configured"
| "timed_out",
) => Some(400),
_ => None,
}
}

#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
#[error("actor")]
pub enum ActorLifecycle {
Expand Down Expand Up @@ -173,7 +196,7 @@

#[derive(RivetError, Debug, Clone, Deserialize, Serialize)]
#[error("engine")]
pub(crate) enum EngineProcessError {

Check warning on line 199 in rivetkit-rust/packages/rivetkit-core/src/error.rs

View workflow job for this annotation

GitHub Actions / Build rivetkit-wasm

enum `EngineProcessError` is never used
#[error(
"binary_not_found",
"Engine binary was not found.",
Expand Down
Loading
Loading