Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .agent/notes/db-driver-test-rerun-2026-05-03.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# DB Driver Test Rerun 2026-05-03

Config: static registry, bare encoding, native/local plus wasm/remote.

## Results

- [x] actor-db: native/local passed, wasm/remote passed
- [x] actor-db-raw: native/local passed, wasm/remote passed
- [x] actor-db-init-order: native/local passed, wasm/remote passed
- [x] actor-db-pragma-migration: native/local passed, wasm/remote passed
- [x] actor-sleep-db: native/local passed, wasm/remote passed
- [x] actor-db-stress: native/local passed, wasm/remote passed on rerun

## Notes

- Fixed a grace-deadline shutdown bug where SQLite cleanup happened after final state serialization, allowing delayed callbacks from the old generation to issue late DB work.
- Added a shared closed flag to `SqliteDb` so both local and remote SQLite handles fail closed after cleanup.
- The first full wasm/remote stress run hit `ltx trailer checksums must be zeroed` in the kitchen-sink case. The isolated kitchen-sink rerun and the full wasm/remote stress rerun both passed.
21 changes: 20 additions & 1 deletion rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::io::Cursor;
#[cfg(feature = "sqlite-local")]
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -76,6 +75,11 @@ pub struct SqliteDb {
/// always sets up sqlite storage under the hood, so handle/actor_id are
/// not a reliable signal for whether the user opted in; this flag is.
enabled: bool,
/// Core owns the logical actor-generation DB lifetime. Depot-client can
/// mark one native worker closed, but stale cloned `c.db` handles could
/// otherwise reopen a new local worker or keep sending remote SQL after
/// actor cleanup.
closed: Arc<AtomicBool>,
#[cfg(feature = "sqlite-local")]
// Forced-sync: native SQLite handles are used inside spawn_blocking and
// synchronous diagnostic accessors.
Expand Down Expand Up @@ -108,6 +112,7 @@ impl SqliteDb {
generation,
backend: select_sqlite_backend(enabled, remote_sqlite),
enabled,
closed: Default::default(),
#[cfg(feature = "sqlite-local")]
db: Default::default(),
#[cfg(feature = "sqlite-local")]
Expand Down Expand Up @@ -149,11 +154,13 @@ impl SqliteDb {
}

pub async fn open(&self) -> Result<()> {
self.ensure_open()?;
match self.backend {
SqliteBackend::LocalNative => {
#[cfg(feature = "sqlite-local")]
{
let _open_guard = self.open_lock.lock().await;
self.ensure_open()?;
if self.db.lock().is_some() {
return Ok(());
}
Expand Down Expand Up @@ -305,6 +312,9 @@ impl SqliteDb {
}

pub async fn close(&self) -> Result<()> {
if self.closed.swap(true, Ordering::AcqRel) {
return Ok(());
}
match self.backend {
SqliteBackend::LocalNative => {
#[cfg(feature = "sqlite-local")]
Expand Down Expand Up @@ -346,6 +356,7 @@ impl SqliteDb {

#[cfg(feature = "sqlite-local")]
fn native_db_handle(&self) -> Result<NativeDatabaseHandle> {
self.ensure_open()?;
self.db
.lock()
.as_ref()
Expand Down Expand Up @@ -432,6 +443,7 @@ impl SqliteDb {
}

fn remote_config(&self) -> Result<RemoteSqliteConfig> {
self.ensure_open()?;
let config = self.runtime_config()?;
let generation = config
.generation
Expand All @@ -444,6 +456,13 @@ impl SqliteDb {
})
}

fn ensure_open(&self) -> Result<()> {
if self.closed.load(Ordering::Acquire) {
return Err(SqliteRuntimeError::Closed.build());
}
Ok(())
}

async fn remote_exec(&self, sql: String) -> Result<QueryResult> {
let config = self.remote_config()?;
let response = config
Expand Down
179 changes: 105 additions & 74 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,17 +1704,121 @@
ShutdownKind::Sleep => LifecycleState::SleepFinalize,
ShutdownKind::Destroy => LifecycleState::Destroying,
});
let reason_label = shutdown_reason_label(reason);
let actor_id = self.ctx.actor_id().to_owned();

Check warning on line 1708 in rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Check warning on line 1708 in rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
// Grace is over once final shutdown starts. Close SQLite before final
// serialization so any late callback from this generation fails closed.
self.ctx.sql().cleanup().await.with_context(|| {
format!("cleanup sqlite before {reason_label} finalization")
})?;
trim_native_allocator_after_shutdown(&actor_id, reason_label);
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "cleanup_sqlite",
"actor shutdown cleanup step completed"
);
self.save_final_state().await?;
self.close_actor_event_channel();
self.join_aborted_run_handle().await;
Self::finish_shutdown_cleanup_with_ctx(self.ctx.clone(), reason).await?;
let ctx = self.ctx.clone();
tokio::join!(
Self::run_shutdown_task_teardown_sleep_state(
ctx.clone(),
reason_label,
actor_id.clone()
),
Self::run_shutdown_task_wait_for_state_writes(
ctx.clone(),
reason_label,
actor_id.clone()
),
Self::run_shutdown_task_cleanup_alarm(ctx, reason, reason_label, actor_id),
);

if matches!(reason, ShutdownKind::Destroy) {
self.ctx.mark_destroy_completed();
}
self.ctx.record_shutdown_wait(reason, started_at.elapsed());
Ok(())
}

async fn run_shutdown_task_teardown_sleep_state(
ctx: ActorContext,
reason_label: &'static str,
actor_id: String,
) {
ctx.teardown_sleep_state().await;
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "teardown_sleep_state",
"actor shutdown cleanup step completed"
);

#[cfg(test)]
run_shutdown_cleanup_hook(&ctx, reason_label);
}

async fn run_shutdown_task_wait_for_state_writes(
ctx: ActorContext,
reason_label: &'static str,
actor_id: String,
) {
ctx.wait_for_pending_state_writes().await;
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "wait_for_pending_state_writes",
"actor shutdown cleanup step completed"
);
}

async fn run_shutdown_task_cleanup_alarm(
ctx: ActorContext,
reason: ShutdownKind,
reason_label: &'static str,
actor_id: String,
) {
match reason {
ShutdownKind::Sleep => {
ctx.sync_alarm_logged();
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "sync_alarm",
"actor shutdown cleanup step completed"
);
// Keep the persisted engine alarm armed across sleep, but abort the
// local Tokio timer owned by this actor generation.
ctx.cancel_local_alarm_timeouts();
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "cancel_local_alarm_timeouts",
"actor shutdown cleanup step completed"
);
}
ShutdownKind::Destroy => {
ctx.cancel_driver_alarm_logged();
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "cancel_driver_alarm",
"actor shutdown cleanup step completed"
);
}
}

ctx.wait_for_pending_alarm_writes().await;
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "wait_for_pending_alarm_writes",
"actor shutdown cleanup step completed"
);
}

async fn save_final_state(&mut self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
if let Err(error) = self.send_actor_event(
Expand Down Expand Up @@ -1757,79 +1861,6 @@
self.ctx.save_state(deltas).await
}

async fn finish_shutdown_cleanup_with_ctx(
ctx: ActorContext,
reason: ShutdownKind,
) -> Result<()> {
let reason_label = shutdown_reason_label(reason);
let actor_id = ctx.actor_id().to_owned();
ctx.teardown_sleep_state().await;
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "teardown_sleep_state",
"actor shutdown cleanup step completed"
);
#[cfg(test)]
run_shutdown_cleanup_hook(&ctx, reason_label);
ctx.wait_for_pending_state_writes().await;
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "wait_for_pending_state_writes",
"actor shutdown cleanup step completed"
);
ctx.sync_alarm_logged();
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "sync_alarm",
"actor shutdown cleanup step completed"
);
ctx.wait_for_pending_alarm_writes().await;
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "wait_for_pending_alarm_writes",
"actor shutdown cleanup step completed"
);
ctx.sql()
.cleanup()
.await
.with_context(|| format!("cleanup sqlite during {reason_label} shutdown"))?;
trim_native_allocator_after_shutdown(&actor_id, reason_label);
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "cleanup_sqlite",
"actor shutdown cleanup step completed"
);
match reason {
// Match the reference TS runtime: keep the persisted engine alarm armed
// across sleep so the next instance still has a wake trigger, but abort
// the local Tokio timer owned by the shutting-down instance.
ShutdownKind::Sleep => {
ctx.cancel_local_alarm_timeouts();
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "cancel_local_alarm_timeouts",
"actor shutdown cleanup step completed"
);
}
ShutdownKind::Destroy => {
ctx.cancel_driver_alarm_logged();
tracing::debug!(
actor_id = %actor_id,
reason = reason_label,
step = "cancel_driver_alarm",
"actor shutdown cleanup step completed"
);
}
}
Ok(())
}

fn record_inbox_depths(&self) {
self.ctx
.metrics()
Expand Down
Loading