Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/bot-runner/tests/bot-runner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ module('timeline handler', () => {

dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_registrations br')) {
Expand Down
7 changes: 7 additions & 0 deletions packages/bot-runner/tests/command-runner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down Expand Up @@ -228,6 +229,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down Expand Up @@ -440,6 +442,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down Expand Up @@ -534,6 +537,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down Expand Up @@ -671,6 +675,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down Expand Up @@ -883,6 +888,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down Expand Up @@ -1002,6 +1008,7 @@ module('command runner', () => {
]);
let dbAdapter = {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
if (sql.includes('FROM bot_commands WHERE bot_id =')) {
Expand Down
4 changes: 4 additions & 0 deletions packages/host/app/lib/sqlite-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export default class SQLiteAdapter implements DBAdapter {
return await this.internalExecute(sql, opts);
}

// SQLite has no pub/sub primitive and the host runs a single in-process
// realm with no peers to notify, so this is intentionally a no-op.
async notify(_channel: string, _payload: string): Promise<void> {}

private async internalExecute(sql: string, opts?: ExecuteOptions) {
sql = this.adjustSQL(sql);
return await this.query(sql, opts);
Expand Down
6 changes: 6 additions & 0 deletions packages/postgres/pg-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ export class PgAdapter implements DBAdapter {
}
}

async notify(channel: string, payload: string): Promise<void> {
await this.execute('SELECT pg_notify($1, $2)', {
bind: [channel, payload],
});
}

// @deprecated — prefer `subscribe(channel, handler)`. Each call to listen()
// opens its own dedicated Client connection for the duration of `fn`, which
// doesn't scale as the number of LISTEN-using callers grows. subscribe()
Expand Down
1 change: 1 addition & 0 deletions packages/realm-server/tests/prerender-proxy-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module(basename(__filename), function () {
function makeDbAdapter(rows: any[]): DBAdapter {
return {
kind: 'pg',
async notify() {},
isClosed: false,
async execute() {
return rows;
Expand Down
23 changes: 8 additions & 15 deletions packages/realm-server/tests/realm-file-changes-listener-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { module, test } from 'qunit';
import { basename } from 'path';
import type { PgAdapter } from '@cardstack/postgres';
import type { Realm } from '@cardstack/runtime-common';
import { query, param } from '@cardstack/runtime-common';
import { setupDB } from './helpers';
import {
RealmFileChangesListener,
Expand Down Expand Up @@ -164,13 +163,10 @@ module(basename(__filename), function () {
});
await listener.start();
try {
await query(dbAdapter, [
`SELECT pg_notify(`,
param('realm_file_changes'),
`,`,
param(`${realmUrl}:src/greeting.gts`),
`)`,
]);
await dbAdapter.notify(
'realm_file_changes',
`${realmUrl}:src/greeting.gts`,
);

const received = await waitFor(() =>
invalidations.length > 0 ? invalidations : undefined,
Expand All @@ -194,13 +190,10 @@ module(basename(__filename), function () {
});
await listener.start();
try {
await query(dbAdapter, [
`SELECT pg_notify(`,
param('realm_file_changes'),
`,`,
param(`http://x.test/not-mounted/:file.gts`),
`)`,
]);
await dbAdapter.notify(
'realm_file_changes',
`http://x.test/not-mounted/:file.gts`,
);

// Wait for the lookup to be recorded (proves the NOTIFY was received
// and dispatched; the lookup miss then silently drops).
Expand Down
1 change: 1 addition & 0 deletions packages/realm-server/tests/screenshot-card-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module(basename(__filename), function () {
function makeDbAdapter(): DBAdapter {
return {
kind: 'pg',
async notify() {},
isClosed: false,
async execute() {
return [];
Expand Down
5 changes: 5 additions & 0 deletions packages/runtime-common/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ export interface DBAdapter {
) => Promise<Record<string, PgPrimitive>[]>;
close: () => Promise<void>;
getColumnNames: (tableName: string) => Promise<string[]>;
// Best-effort cross-instance broadcast on a named channel. Backends that
// don't support pub/sub (e.g. in-process SQLite) implement this as a no-op:
// the caller must treat it as fire-and-forget cache-coherency, never as
// delivery-guaranteed messaging.
notify: (channel: string, payload: string) => Promise<void>;
}
16 changes: 7 additions & 9 deletions packages/runtime-common/realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1125,19 +1125,17 @@ export class Realm {
// same path. Best-effort — failures are logged and swallowed because the
// local write already succeeded and a missed NOTIFY is a bounded cache-
// staleness window (see docs §9 "Cache-invalidation NOTIFY missed"), not
// a correctness failure.
// a correctness failure. Adapters without pub/sub (e.g. SQLite in the
// host/browser context) implement notify as a no-op.
async #notifyFileChange(path: LocalPath): Promise<void> {
try {
await query(this.#dbAdapter, [
`SELECT pg_notify(`,
param(REALM_FILE_CHANGES_CHANNEL),
`,`,
param(`${this.url}:${path}`),
`)`,
]);
await this.#dbAdapter.notify(
REALM_FILE_CHANGES_CHANNEL,
`${this.url}:${path}`,
);
} catch (err: unknown) {
this.#log.warn(
`pg_notify ${REALM_FILE_CHANGES_CHANNEL} failed for ${this.url}:${path}: ${String(err)}`,
`notify ${REALM_FILE_CHANGES_CHANNEL} failed for ${this.url}:${path}: ${String(err)}`,
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ function makeDBAdapter(
): DBAdapter {
return {
kind: 'pg',
notify: async () => {},
isClosed: false,
execute: async (sql: string, opts?: ExecuteOptions) => {
assertion?.(sql, opts);
Expand Down
Loading