Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions packages/realm-server/lib/module-cache-coordination.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
import { createHash } from 'crypto';
import {
logger,
MODULE_CACHE_POPULATED_CHANNEL,
param,
type Expression,
type PgPrimitive,
type PopulateCoordinator,
} from '@cardstack/runtime-common';
import type { PgAdapter } from '@cardstack/postgres';
import { WorkLoop } from '@cardstack/postgres';

const log = logger('realm-server:module-cache-coordination');
const DEFAULT_POLL_INTERVAL_MS = 60_000;

// Hash a coalesce key to a stable signed int64 string for use as a
// pg advisory lock key. Same shape as hashRealmUrlForAdvisoryLock — sha256
// + readBigInt64BE + toString — so the int64 range is fully utilized and
// callers don't need BigInt-aware parameter binding. Two separate
// keyspaces (realm-write locks vs coalesce locks) coexist in pg's single
// advisory-lock namespace; collision probability is negligible at any
// realistic scale.
function hashCoalesceKeyForAdvisoryLock(key: string): string {
const digest = createHash('sha256').update(key).digest();
return digest.readBigInt64BE(0).toString();
}

// Implements PopulateCoordinator (CS-10953). Owns:
// - a dedicated LISTEN connection on `module_cache_populated` (via
// `PgAdapter.listen`, same pattern as RealmFileChangesListener);
// - a Map<coalesceKey, Set<waiter>> that NOTIFY payloads dispatch into;
// - a `tryAcquireAndRun` method that opens a pinned pool connection,
// attempts a non-blocking advisory xact-lock keyed on the hash of
// the coalesce key, and (if won) runs `fn` inside the BEGIN/COMMIT
// window with a `pg_notify` emit between persist and commit.
//
// The pinned connection ONLY holds the advisory lock and emits the
// NOTIFY. The `fn` callback issues its DB work (readFromDatabaseCache,
// persistModuleCacheEntry) through the shared dbAdapter — a separate
// pool connection autocommits each query as today. Pool pressure is
// bounded by N processes (each pins one extra client per concurrent
// coordinated load) rather than N × M concurrent callers, since the
// in-process #inFlight coalescer fans every same-key callers into one
// coordinated load per process.
//
// `pg_try_advisory_xact_lock` is non-blocking by design: a blocking
// `pg_advisory_xact_lock` would hold a pool client for the full
// prerender wall time (up to 150s in production) on every loser,
// quickly exhausting the pool. Losers release their pinned connection
// immediately on contention and instead wait on NOTIFY — much cheaper
// to multiplex.
//
// Behavior at N=1: the try-lock always succeeds uncontended; loser
// path is never taken; self-NOTIFY is dropped because there are no
// waiters registered.
//
// Sqlite/in-memory deployments don't construct a coordinator — when
// `dbAdapter.kind !== 'pg'` the realm-server `main.ts` skips
// constructing this and CachingDefinitionLookup runs its uncoordinated
// path.
export interface ModuleCacheCoordinatorDeps {
dbAdapter: PgAdapter;
// Optional for tests.
pollIntervalMs?: number;
}

interface KeyWaiter {
resolve: () => void;
}

export class ModuleCacheCoordinator implements PopulateCoordinator {
#deps: ModuleCacheCoordinatorDeps;
#loop: WorkLoop;
#started = false;
#waiters = new Map<string, Set<KeyWaiter>>();

constructor(deps: ModuleCacheCoordinatorDeps) {
this.#deps = deps;
this.#loop = new WorkLoop(
'module-cache-populated',
deps.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS,
);
}

start(): void {
if (this.#started) {
return;
}
this.#started = true;
this.#loop.run(async (loop) => {
await this.#deps.dbAdapter.listen(
MODULE_CACHE_POPULATED_CHANNEL,
(notification: { payload?: string }) => {
this.#dispatch(notification.payload);
},
async () => {
while (!loop.shuttingDown) {
await loop.sleep();
}
},
);
});
}

async shutDown(): Promise<void> {
// Resolve any waiters still parked so callers don't hang forever
// during a clean shutdown. Their loops will re-read the cache on
// wake and either return the row or surface a transient miss as a
// normal undefined.
for (let waiters of this.#waiters.values()) {
for (let waiter of waiters) {
waiter.resolve();
}
}
this.#waiters.clear();
await this.#loop.shutDown();
}

// PopulateCoordinator — winner path.
async tryAcquireAndRun<T>(
coalesceKey: string,
fn: () => Promise<T>,
): Promise<{ acquired: true; result: T } | { acquired: false }> {
const lockKey = hashCoalesceKeyForAdvisoryLock(coalesceKey);
return await this.#deps.dbAdapter.withConnection(async (queryFn) => {
await queryFn(['BEGIN']);
let lockResult: Record<string, PgPrimitive>[];
try {
lockResult = await queryFn([
'SELECT pg_try_advisory_xact_lock(',
param(lockKey),
'::bigint) AS got',
]);
} catch (err: unknown) {
await this.#safeRollback(queryFn);
throw err;
}
const got = lockResult[0]?.got === true;
if (!got) {
// Contended. Release the pool client immediately so the loser
// doesn't hold a pinned connection for the duration of the
// peer's prerender (could be many seconds; would exhaust the
// pool under N>>1 concurrency).
await this.#safeRollback(queryFn);
return { acquired: false };
}
try {
const result = await fn();
// Emit pg_notify INSIDE the same transaction as the lock so
// peers only see the signal on commit. The persist itself ran
// through the shared dbAdapter (already autocommitted), so the
// row is visible by the time peers re-read on wake.
//
// We notify regardless of whether `fn` produced a row or
// undefined — a "no row" outcome (all populationCandidates
// produced missing-module errors, or generation-changed
// returned post-invalidate state) still wants to wake peers
// promptly so they don't sit on the COALESCE_NOTIFY_WAIT_MS
// timeout. Peers re-read the cache on wake; if it's empty,
// they return undefined and the user sees the same answer
// we returned, just faster than a missed-NOTIFY would.
await queryFn([
'SELECT pg_notify(',
param(MODULE_CACHE_POPULATED_CHANNEL),
',',
param(coalesceKey),
')',
]);
await queryFn(['COMMIT']);
return { acquired: true, result };
} catch (err: unknown) {
await this.#safeRollback(queryFn);
throw err;
}
});
}

// PopulateCoordinator — loser path. Resolves on either NOTIFY or
// timeout; the caller's outer loop re-reads the cache regardless.
async waitForKey(coalesceKey: string, timeoutMs: number): Promise<void> {
return await new Promise((resolve) => {
let resolved = false;
const settle = () => {
if (resolved) return;
resolved = true;
clearTimeout(timer);
// Best-effort waiter unregister so the dispatch path doesn't
// leak a stale entry. If we already left via NOTIFY the entry
// is already gone; if we left via timeout we need to clean up.
const set = this.#waiters.get(coalesceKey);
if (set) {
set.delete(waiter);
if (set.size === 0) {
this.#waiters.delete(coalesceKey);
}
}
resolve();
};
const waiter: KeyWaiter = { resolve: settle };
let set = this.#waiters.get(coalesceKey);
if (!set) {
set = new Set();
this.#waiters.set(coalesceKey, set);
}
set.add(waiter);
const timer = setTimeout(settle, timeoutMs);
// unref so a hung waiter doesn't hold the Node event loop open
// during shutdown / test teardown. Real workloads won't reach
// the timeout in healthy operation.
if (typeof (timer as { unref?: () => void }).unref === 'function') {
(timer as { unref: () => void }).unref();
}
});
}

// Exposed for tests.
handleNotification(payload: string | undefined): void {
this.#dispatch(payload);
}

#dispatch(payload: string | undefined): void {
if (!payload) {
return;
}
// Payload is the raw inFlightKey (coalesce key). No structure to
// parse — just look it up in the waiter map and resolve everyone
// parked on it.
const set = this.#waiters.get(payload);
if (!set) {
return;
}
this.#waiters.delete(payload);
for (let waiter of set) {
try {
waiter.resolve();
} catch (err: unknown) {
log.warn(
`${MODULE_CACHE_POPULATED_CHANNEL} waiter resolve threw for ${payload}: ${String(err)}`,
);
}
}
}

async #safeRollback(
queryFn: (expr: Expression) => Promise<Record<string, PgPrimitive>[]>,
): Promise<void> {
try {
await queryFn(['ROLLBACK']);
} catch (rollbackErr: unknown) {
// The xact-lock and any in-tx state release when the connection's
// transaction is aborted — pg auto-rolls back on client release —
// so we don't have a stale-lock problem. Log for visibility.
log.warn(
`ROLLBACK after coordinator error failed: ${String(rollbackErr)}`,
);
}
}
}
Loading
Loading