Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ed61068
feat: US-001 - SqliteSystem: support multiple registered files
NathanFlurry Mar 18, 2026
393163e
feat: US-002 - Database: add fileName getter and onClose callback sup…
NathanFlurry Mar 18, 2026
f3cead8
feat: US-003 - Database.close(): serialize file unregistration under …
NathanFlurry Mar 18, 2026
c695329
feat: US-004 - SqliteVfs: support multiple open() calls and database …
NathanFlurry Mar 18, 2026
e05f6f2
feat: US-005 - VFS callbacks: re-read HEAPU8 after await for buffer d…
NathanFlurry Mar 18, 2026
456060d
feat: US-006 - Upstream patch: @rivetkit/sqlite accept pre-compiled W…
NathanFlurry Mar 18, 2026
020e16c
feat: US-007 - Extract ISqliteVfs interface
NathanFlurry Mar 18, 2026
15b0700
feat: US-008 - Implement SqliteVfsPool with instance lifecycle and mo…
NathanFlurry Mar 18, 2026
c8923e5
feat: US-009 - SqliteVfsPool: release with short name lifecycle and p…
NathanFlurry Mar 18, 2026
5e8563e
feat: US-010 - SqliteVfsPool: idle timer scale-down and in-flight op …
NathanFlurry Mar 18, 2026
98e33ee
feat: US-011 - Implement PooledSqliteHandle with double-release guard
NathanFlurry Mar 18, 2026
28380d3
feat: US-012 - SqliteVfsPool: graceful shutdown
NathanFlurry Mar 18, 2026
9ce0261
feat: US-013 - ActorDriver interface: add actorId to createSqliteVfs
NathanFlurry Mar 18, 2026
a405f19
feat: US-014 - File-system driver: integrate SqliteVfsPool
NathanFlurry Mar 18, 2026
efa46ba
feat: US-015 - Engine driver: integrate SqliteVfsPool
NathanFlurry Mar 18, 2026
71b05a9
feat: US-016 - Clean up dead code and update test helpers
NathanFlurry Mar 18, 2026
853f258
feat: US-017 - Scope per-database AsyncMutex to closed flag only
NathanFlurry Mar 18, 2026
e29d24e
feat: US-018 - Fix forceCloseByFilePrefix prefix collision with numer…
NathanFlurry Mar 18, 2026
f574174
feat: US-019 - Add Database.close() idempotency guard
NathanFlurry Mar 18, 2026
bd0af8b
feat: US-020 - Fix WASM module promise rejection caching
NathanFlurry Mar 18, 2026
141aa78
feat: US-021 - Validate actorsPerInstance >= 1 in pool constructor
NathanFlurry Mar 18, 2026
93c575e
feat: US-022 - Make trackOp a private method on SqliteVfsPool
NathanFlurry Mar 18, 2026
1924dac
feat: US-023 - Fix #resolveFile to use O(1) Map.get instead of O(N) loop
NathanFlurry Mar 18, 2026
62e55cc
feat: US-024 - Wrap Database operations (exec, run, query, close) wit…
NathanFlurry Mar 18, 2026
1685bbe
feat: US-025 - Fix concurrent acquire() creating duplicate instances …
NathanFlurry Mar 18, 2026
94ef6dc
feat: US-026 - Add opsInFlight check to idle timer callback
NathanFlurry Mar 18, 2026
aba18b8
chore: add adversarial review findings to PRD and commit ralph artifacts
NathanFlurry Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions rivetkit-typescript/packages/rivetkit/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { type AnyConn } from "./conn/mod";
import type { AnyActorInstance } from "./instance/mod";
import type { RegistryConfig } from "@/registry/config";
import type { RawDatabaseClient } from "@/db/config";
import type { SqliteVfs } from "@rivetkit/sqlite-vfs";
import type { ISqliteVfs } from "@rivetkit/sqlite-vfs";
import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core";

export type ActorDriverBuilder = (
Expand Down Expand Up @@ -92,13 +92,12 @@ export interface ActorDriver {
* Creates a SQLite VFS instance for creating KV-backed databases.
* If not provided, the database provider will need an override.
*
* @rivetkit/sqlite's async build is not re-entrant per module instance. Drivers
* should return a new instance per call for actor-level isolation.
* Drivers may use the actorId for pool-based instance assignment.
*
* This is a method (not a property) so drivers can use dynamic imports,
* keeping the core driver tree-shakeable from @rivetkit/sqlite.
*/
createSqliteVfs?(): SqliteVfs | Promise<SqliteVfs>;
createSqliteVfs?(actorId: string): ISqliteVfs | Promise<ISqliteVfs>;

/**
* Requests the actor to go to sleep.
Expand Down
11 changes: 5 additions & 6 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { OtlpExportTraceServiceRequestJson } from "@rivetkit/traces";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import {
createNoopTraces,
createTraces,
Expand All @@ -6,7 +6,7 @@
type SpanStatusInput,
type Traces,
} from "@rivetkit/traces";
import type { SqliteVfs } from "@rivetkit/sqlite-vfs";
import type { ISqliteVfs } from "@rivetkit/sqlite-vfs";
import invariant from "invariant";
import type { ActorKey } from "@/actor/mod";
import type { Client } from "@/client/client";
Expand Down Expand Up @@ -171,7 +171,7 @@
// MARK: - Variables & Database
#vars?: V;
#db?: InferDatabaseClient<DB>;
#sqliteVfs?: SqliteVfs;
#sqliteVfs?: ISqliteVfs;

// MARK: - Background Tasks
#backgroundPromises: Promise<void>[] = [];
Expand Down Expand Up @@ -1471,11 +1471,10 @@

let client: InferDatabaseClient<DB> | undefined;
try {
// Every actor gets its own SqliteVfs/@rivetkit/sqlite instance. The async
// @rivetkit/sqlite build is not re-entrant, and sharing one instance across
// actors can cause cross-actor contention and runtime corruption.
// Acquire a SQLite VFS handle for this actor. The driver may return a
// standalone VFS or a pooled handle that shares a WASM instance.
if (!this.#sqliteVfs && this.driver.createSqliteVfs) {
this.#sqliteVfs = await this.driver.createSqliteVfs();
this.#sqliteVfs = await this.driver.createSqliteVfs(this.#actorId);
}

client = await this.#config.db.createClient({
Expand Down
9 changes: 4 additions & 5 deletions rivetkit-typescript/packages/rivetkit/src/db/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { SqliteVfs } from "@rivetkit/sqlite-vfs";
import type { ISqliteVfs } from "@rivetkit/sqlite-vfs";

export type AnyDatabaseProvider = DatabaseProvider<any> | undefined;

Expand Down Expand Up @@ -35,11 +35,10 @@ export interface DatabaseProviderContext {
};

/**
* SQLite VFS instance for creating KV-backed databases.
* This should be actor-scoped because @rivetkit/sqlite is not re-entrant per
* module instance.
* SQLite VFS handle for creating KV-backed databases.
* May be a standalone VFS or a pooled handle from SqliteVfsPool.
*/
sqliteVfs?: SqliteVfs;
sqliteVfs?: ISqliteVfs;
}

export type DatabaseProvider<DB extends RawAccess> = {
Expand Down
141 changes: 66 additions & 75 deletions rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Database } from "@rivetkit/sqlite-vfs";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
import {
drizzle as proxyDrizzle,
type SqliteRemoteDatabase,
Expand Down Expand Up @@ -40,27 +40,27 @@
params: any[],
method: "run" | "all" | "values" | "get",
): Promise<{ rows: any }> => {
return mutex.run(async () => {
await mutex.run(async () => {
if (isClosed()) {
throw new Error("database is closed");
}
});

if (method === "run") {
await waDb.run(sql, toSqliteBindings(params));
return { rows: [] };
}
if (method === "run") {
await waDb.run(sql, toSqliteBindings(params));
return { rows: [] };
}

// For all/get/values, use parameterized query
const result = await waDb.query(sql, toSqliteBindings(params));
// For all/get/values, use parameterized query
const result = await waDb.query(sql, toSqliteBindings(params));

// drizzle's mapResultRow accesses rows by column index (positional arrays)
// so we return raw arrays for all methods
if (method === "get") {
return { rows: result.rows[0] };
}
// drizzle's mapResultRow accesses rows by column index (positional arrays)
// so we return raw arrays for all methods
if (method === "get") {
return { rows: result.rows[0] };
}

return { rows: result.rows };
});
return { rows: result.rows };
};
}

Expand All @@ -70,29 +70,24 @@
*/
async function runInlineMigrations(
waDb: Database,
mutex: AsyncMutex,
migrations: any,
): Promise<void> {
// Create migrations table
await mutex.run(() =>
waDb.exec(`
await waDb.exec(`
CREATE TABLE IF NOT EXISTS __drizzle_migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT NOT NULL,
created_at INTEGER
)
`),
);
`);

// Get the last applied migration
let lastCreatedAt = 0;
await mutex.run(() =>
waDb.exec(
"SELECT id, hash, created_at FROM __drizzle_migrations ORDER BY created_at DESC LIMIT 1",
(row) => {
lastCreatedAt = Number(row[2]) || 0;
},
),
await waDb.exec(
"SELECT id, hash, created_at FROM __drizzle_migrations ORDER BY created_at DESC LIMIT 1",
(row) => {
lastCreatedAt = Number(row[2]) || 0;
},
);

// Apply pending migrations from journal entries
Expand All @@ -109,14 +104,12 @@
if (!sql) continue;

// Execute migration SQL
await mutex.run(() => waDb.exec(sql));
await waDb.exec(sql);

// Record migration
await mutex.run(() =>
waDb.run(
"INSERT INTO __drizzle_migrations (hash, created_at) VALUES (?, ?)",
[entry.tag, entry.when],
),
await waDb.run(
"INSERT INTO __drizzle_migrations (hash, created_at) VALUES (?, ?)",
[entry.tag, entry.when],
);
}
}
Expand Down Expand Up @@ -165,63 +158,61 @@
query: string,
...args: unknown[]
): Promise<TRow[]> => {
return mutex.run(async () => {
ensureOpen();

if (args.length > 0) {
const result = await waDb.query(
query,
toSqliteBindings(args),
);
return result.rows.map((row: unknown[]) => {
const obj: Record<string, unknown> = {};
for (
let i = 0;
i < result.columns.length;
i++
) {
obj[result.columns[i]] = row[i];
}
return obj;
}) as TRow[];
}
// Use exec for non-parameterized queries since
// @rivetkit/sqlite's query() can crash on some statements.
const results: Record<string, unknown>[] = [];
let columnNames: string[] | null = null;
await waDb.exec(
await mutex.run(async () => { ensureOpen(); });

if (args.length > 0) {
const result = await waDb.query(
query,
(row: unknown[], columns: string[]) => {
if (!columnNames) {
columnNames = columns;
}
const obj: Record<string, unknown> = {};
for (let i = 0; i < row.length; i++) {
obj[columnNames[i]] = row[i];
}
results.push(obj);
},
toSqliteBindings(args),
);
return results as TRow[];
});
return result.rows.map((row: unknown[]) => {
const obj: Record<string, unknown> = {};
for (
let i = 0;
i < result.columns.length;
i++
) {
obj[result.columns[i]] = row[i];
}
return obj;
}) as TRow[];
}
// Use exec for non-parameterized queries since
// @rivetkit/sqlite's query() can crash on some statements.
const results: Record<string, unknown>[] = [];
let columnNames: string[] | null = null;
await waDb.exec(
query,
(row: unknown[], columns: string[]) => {
if (!columnNames) {
columnNames = columns;
}
const obj: Record<string, unknown> = {};
for (let i = 0; i < row.length; i++) {
obj[columnNames[i]] = row[i];
}
results.push(obj);
},
);
return results as TRow[];
},
close: async () => {
await mutex.run(async () => {
if (closed) {
return;
}
const shouldClose = await mutex.run(async () => {
if (closed) return false;
closed = true;
return true;
});
if (shouldClose) {
await waDb.close();
waDbInstance = null;
});
}
},
} satisfies RawAccess);
},
onMigrate: async (_client) => {
if (config?.migrations && waDbInstance) {
await runInlineMigrations(
waDbInstance,
mutex,
config.migrations,
);
}
Expand Down
Loading
Loading