Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions packages/postgres/pg-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,38 @@ function configuredPoolMax(): number | undefined {
return Number.isInteger(value) && value > 0 ? value : undefined;
}

export type NotificationHandler = (notification: Notification) => void;

export interface NotificationSubscription {
unsubscribe(): Promise<void>;
}

// Per-channel state kept alive while at least one subscriber is registered
// or while a LISTEN is being established. Each subscribe() pushes its own
// handler entry and holds a reference to it, so unsubscribing removes the
// exact entry — even when the same function reference is subscribed twice.
// `establishment` resolves when LISTEN has succeeded; concurrent subscribers
// to the same channel join the same promise so a LISTEN failure rejects all
// of them atomically rather than leaving later subscribers stranded.
type HandlerEntry = { fn: NotificationHandler };
type ChannelState = {
handlers: HandlerEntry[];
establishment: Promise<void>;
};

export class PgAdapter implements DBAdapter {
readonly kind = 'pg';
#isClosed = false;
private pool: Pool;
private started: Promise<void>;
private config: Config;
// Shared LISTEN connection used by all subscribe() callers. A dedicated
// Client (not Pool-acquired) is required because LISTEN/NOTIFY is
// unreliable on pooled connections — see node-postgres#1543. Lazily
// opened on first subscribe; closed in close().
#notificationClient?: Client;
#notificationClientStarting?: Promise<Client>;
#channels = new Map<string, ChannelState>();

constructor(opts?: { autoMigrate?: boolean; migrationLogging?: boolean }) {
if (opts?.autoMigrate) {
Expand Down Expand Up @@ -81,9 +107,162 @@ export class PgAdapter implements DBAdapter {
log.debug(`closing ${this.url}`);
this.#isClosed = true;
await this.started;
// Resolve any in-flight notification-client startup so we can end the
// resulting Client. Without this await, a close() that races a first
// subscribe() can leave the connection alive after #isClosed flipped to
// true, because the connect() resolves into #notificationClient only
// after we've already returned from close().
let pendingStart = this.#notificationClientStarting;
if (pendingStart) {
try {
await pendingStart;
} catch {
// Startup failed — there's nothing for us to end.
}
}
const client = this.#notificationClient;
this.#notificationClient = undefined;
this.#channels.clear();
if (client) {
try {
await client.end();
} catch (err: unknown) {
log.warn(`failed to end shared notification client: ${String(err)}`);
}
}
await this.pool.end();
}

// Subscribe a handler to a Postgres NOTIFY channel. Multiple subscribe()
// callers — across channels, or even on the same channel — share one
// dedicated Client; the Client is opened lazily on the first subscribe and
// closed in close(). Each call returns an `unsubscribe()` that removes
// just this handler; UNLISTEN is sent only after the last handler for a
// channel is removed. Concurrent subscribes on the same channel join the
// same in-flight LISTEN, so a LISTEN failure rejects all racing callers
// atomically — no caller is ever stranded with a registered handler that
// the backend isn't actually delivering to.
async subscribe(
channel: string,
handler: NotificationHandler,
): Promise<NotificationSubscription> {
await this.started;
// Loop in case the channel state we joined gets torn down by a concurrent
// unsubscribe (or LISTEN failure) while we were still awaiting establishment.
// eslint-disable-next-line no-constant-condition
while (true) {
if (this.#isClosed) {
throw new Error('PgAdapter is closed');
}
const client = await this.#ensureNotificationClient();
if (this.#isClosed) {
throw new Error('PgAdapter is closed');
}
let state = this.#channels.get(channel);
if (!state) {
const safeChannel = safeName(channel);
const establishment = (async () => {
await client.query(`LISTEN ${safeChannel}`);
})();
const newState: ChannelState = { handlers: [], establishment };
this.#channels.set(channel, newState);
// If LISTEN ultimately rejects, drop the channel from the map so the
// next subscribe gets a fresh attempt rather than re-awaiting a
// permanently-rejected promise. Awaiting subscribers see the rejection
// through their own `await state.establishment` below.
establishment.catch(() => {
if (this.#channels.get(channel) === newState) {
this.#channels.delete(channel);
}
});
state = newState;
}
const joined = state;
await joined.establishment;
// A concurrent unsubscribe may have torn the channel state down between
// when we joined it and when LISTEN resolved. Re-check, retry from the
// top if so.
if (this.#channels.get(channel) !== joined) {
continue;
}
const entry: HandlerEntry = { fn: handler };
joined.handlers.push(entry);
let unsubscribed = false;
return {
unsubscribe: async () => {
if (unsubscribed) {
return;
}
unsubscribed = true;
const cur = this.#channels.get(channel);
if (!cur) {
return;
}
const idx = cur.handlers.indexOf(entry);
if (idx >= 0) {
cur.handlers.splice(idx, 1);
}
if (cur.handlers.length > 0) {
return;
}
this.#channels.delete(channel);
if (this.#notificationClient && !this.#isClosed) {
try {
await this.#notificationClient.query(
`UNLISTEN ${safeName(channel)}`,
);
} catch (err: unknown) {
log.warn(`UNLISTEN ${channel} failed: ${String(err)}`);
}
}
},
};
}
}

async #ensureNotificationClient(): Promise<Client> {
if (this.#notificationClient) {
return this.#notificationClient;
}
if (this.#notificationClientStarting) {
return this.#notificationClientStarting;
}
this.#notificationClientStarting = (async () => {
const client = new Client(this.config);
client.on('notification', (n) => {
log.debug(`heard pg notification for channel %s`, n.channel);
const state = this.#channels.get(n.channel);
if (!state) {
return;
}
for (const entry of [...state.handlers]) {
try {
entry.fn(n);
} catch (err: unknown) {
log.warn(
`notification handler for channel ${n.channel} threw: ${String(err)}`,
);
}
}
});
client.on('error', (err) => {
// The shared client is the substrate for every subscriber, so a
// disconnect silently kills them all. Surface it loudly. Reconnect
// is not implemented here; current production has not seen this
// path, and the legacy listen() API has the same hazard.
log.error(`shared notification client error: ${String(err)}`);
});
await client.connect();
this.#notificationClient = client;
return client;
})();
try {
return await this.#notificationClientStarting;
} finally {
this.#notificationClientStarting = undefined;
}
}

async execute(
sql: string,
opts?: ExecuteOptions,
Expand Down Expand Up @@ -112,6 +291,11 @@ export class PgAdapter implements DBAdapter {
}
}

// @deprecated — prefer `subscribe(channel, handler)`. Each call to listen()
// opens its own dedicated Client connection for the duration of `fn`, which
// doesn't scale as the number of LISTEN-using callers grows. subscribe()
// multiplexes all callers onto a single shared Client. This entry point is
// kept for callers that haven't migrated yet (e.g. pg-queue).
async listen(
channel: string,
handler: (notification: Notification) => void,
Expand Down
68 changes: 33 additions & 35 deletions packages/realm-server/lib/realm-file-changes-listener.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import type { Realm } from '@cardstack/runtime-common';
import { logger, REALM_FILE_CHANGES_CHANNEL } from '@cardstack/runtime-common';
import type { PgAdapter } from '@cardstack/postgres';
import { WorkLoop } from '@cardstack/postgres';
import type { PgAdapter, NotificationSubscription } from '@cardstack/postgres';

const log = logger('realm-server:file-changes-listener');
const DEFAULT_POLL_INTERVAL_MS = 60_000;

// Cross-instance cache invalidation. When any realm-server emits
// `NOTIFY realm_file_changes, '<url>:<path>'` (see Realm.#notifyFileChange in
Expand All @@ -14,59 +12,59 @@ const DEFAULT_POLL_INTERVAL_MS = 60_000;
// #moduleCache entries. If it's not mounted, the notification is dropped —
// this instance has no stale state to clear.
//
// The LISTEN is backed by `PgAdapter.listen` (dedicated Client, not pool-
// returned) exactly like the registry reconciler. A poll fallback is not
// strictly needed here — missed NOTIFYs degrade to cache staleness that the
// next write will re-invalidate — but the WorkLoop gives us a predictable
// shutdown path and matches the pattern used elsewhere. We set the poll
// interval to something long (60s) so the fallback loop doesn't burn CPU
// on busy instances; it exists to surface connection health, not to
// re-scan anything.
// The LISTEN is backed by `PgAdapter.subscribe` (shared multiplexed
// notification client). There is no periodic work to run between
// notifications — the whole dispatch is in the payload — so we don't keep a
// WorkLoop here.
export interface RealmFileChangesListenerDeps {
dbAdapter: PgAdapter;
lookupMountedRealm: (url: string) => Realm | undefined;
// Optional for tests.
pollIntervalMs?: number;
}

export class RealmFileChangesListener {
#deps: RealmFileChangesListenerDeps;
#loop: WorkLoop;
#started = false;
#subscription?: NotificationSubscription;
#starting?: Promise<void>;

constructor(deps: RealmFileChangesListenerDeps) {
this.#deps = deps;
this.#loop = new WorkLoop(
'realm-file-changes',
deps.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS,
);
}

start(): void {
if (this.#started) {
async start(): Promise<void> {
if (this.#subscription || this.#starting) {
await this.#starting;
return;
}
this.#started = true;
this.#loop.run(async (loop) => {
await this.#deps.dbAdapter.listen(
this.#starting = (async () => {
this.#subscription = await this.#deps.dbAdapter.subscribe(
REALM_FILE_CHANGES_CHANNEL,
(notification: { payload?: string }) => {
// Invalidate synchronously on wake rather than forcing a reconcile
// pass: there is nothing to poll from the DB side, the whole
// payload is in the notification.
(notification) => {
this.#handleNotification(notification.payload);
},
async () => {
while (!loop.shuttingDown) {
await loop.sleep();
}
},
);
});
})();
try {
await this.#starting;
} finally {
this.#starting = undefined;
}
}

async shutDown(): Promise<void> {
Comment thread
lukemelia marked this conversation as resolved.
await this.#loop.shutDown();
// Wait for any in-flight start() to finish wiring up #subscription before
// tearing down. Otherwise shutDown can run while subscribe() is still
// awaiting the LISTEN, return early with #subscription still undefined,
// and the racing start() then installs a live subscription after we
// thought we were shut down. Swallow start() errors here — if startup
// failed, there's nothing for us to unsubscribe.
try {
await this.#starting;
} catch {
// ignore
}
const sub = this.#subscription;
this.#subscription = undefined;
await sub?.unsubscribe();
}

// Exposed for tests; invoked internally by the LISTEN handler.
Expand Down
47 changes: 34 additions & 13 deletions packages/realm-server/lib/realm-registry-reconciler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Realm } from '@cardstack/runtime-common';
import { logger, param, query } from '@cardstack/runtime-common';
import type { PgAdapter } from '@cardstack/postgres';
import type { PgAdapter, NotificationSubscription } from '@cardstack/postgres';
import { WorkLoop } from '@cardstack/postgres';

const log = logger('realm-server:registry-reconciler');
Expand Down Expand Up @@ -65,6 +65,8 @@ export class RealmRegistryReconciler {
#deps: ReconcilerDeps;
#loop: WorkLoop;
#started = false;
#subscription?: NotificationSubscription;
#starting?: Promise<void>;
knownByUrl = new Map<string, RealmRegistryRow>();
mounted = new Map<string, Realm>();
pendingMounts = new Map<string, Promise<Realm>>();
Expand Down Expand Up @@ -102,28 +104,47 @@ export class RealmRegistryReconciler {
}

// Begin the LISTEN + poll loop. Safe to call once; no-ops on repeat.
start(): void {
async start(): Promise<void> {
if (this.#started) {
await this.#starting;
return;
}
this.#started = true;
this.#loop.run(async (loop) => {
await this.#deps.dbAdapter.listen(
this.#starting = (async () => {
this.#subscription = await this.#deps.dbAdapter.subscribe(
CHANNEL,
loop.wake.bind(loop),
async () => {
// Run one reconcile immediately on start, then loop on wake-or-poll.
while (!loop.shuttingDown) {
await this.#safeReconcile();
await loop.sleep();
}
},
this.#loop.wake.bind(this.#loop),
);
});
this.#loop.run(async (loop) => {
// Run one reconcile immediately on start, then loop on wake-or-poll.
while (!loop.shuttingDown) {
await this.#safeReconcile();
await loop.sleep();
}
});
})();
try {
await this.#starting;
} finally {
this.#starting = undefined;
}
}

async shutDown(): Promise<void> {
// Wait for any in-flight start() to finish wiring up #subscription before
// tearing down. Otherwise shutDown can race a still-pending subscribe()
// and leave a live subscription installed after we thought we were shut
// down. Swallow start() errors here — nothing to unsubscribe if startup
// failed.
try {
await this.#starting;
} catch {
// ignore
}
await this.#loop.shutDown();
const sub = this.#subscription;
this.#subscription = undefined;
await sub?.unsubscribe();
Comment thread
lukemelia marked this conversation as resolved.
}

async #safeReconcile(): Promise<void> {
Expand Down
Loading
Loading