Skip to content
1 change: 1 addition & 0 deletions packages/host/config/schema/1779030457123_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
last_known_good_deps BLOB,
markdown TEXT,
timing_diagnostics BLOB,
job_id INTEGER,
PRIMARY KEY ( url, realm_url, type )
);

Expand Down
13 changes: 12 additions & 1 deletion packages/host/tests/helpers/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,13 @@ export async function setupIndex(
indexRows: workingRows,
now,
client,
columnSourceTable: 'boxel_index_working',
});
let productionIndexedCardsExpressions = await indexedCardsExpressions({
indexRows: productionRows,
now,
client,
columnSourceTable: 'boxel_index',
});
let versionExpressions = versionRows.map((r) => asExpressions(r));

Expand Down Expand Up @@ -219,10 +221,17 @@ async function indexedCardsExpressions({
indexRows,
now,
client,
columnSourceTable,
}: {
indexRows: TestIndexRow[];
now: number;
client: DBAdapter;
// Which table's column list to drive the dataObject projection. The
// default `boxel_index` is missing columns that exist only on
// `boxel_index_working` (e.g. `job_id`); when the caller is building
// expressions for a working-table insert, those columns must be
// preserved.
columnSourceTable?: 'boxel_index' | 'boxel_index_working';
}) {
return await Promise.all(
indexRows.map(async (r) => {
Expand Down Expand Up @@ -263,7 +272,9 @@ async function indexedCardsExpressions({
...defaultIndexEntry,
...row,
};
let columnNames = await client.getColumnNames('boxel_index');
let columnNames = await client.getColumnNames(
columnSourceTable ?? 'boxel_index',
);

// Make sure all table columns are present in the data object, even if their value is undefined. This is to assure
// that the order of the columns in the insert statement is consistent for all types of resources
Expand Down
280 changes: 280 additions & 0 deletions packages/host/tests/unit/index-writer-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1728,4 +1728,284 @@ module('Unit | index-writer', function (hooks) {
'card type summary uses last known good card type data',
);
});

test('resumes URLs already processed by a previous attempt of the same job', async function (assert) {
let url = `${testRealmURL}1.json`;
let lastModified = 1700000000;
await setupIndex(
adapter,
[{ realm_url: testRealmURL, current_version: 1 }],
{
working: [
{
url,
realm_version: 1,
realm_url: testRealmURL,
type: 'instance',
job_id: 42,
last_modified: String(lastModified),
is_deleted: false,
deps: [],
types: [],
},
],
production: [],
},
);

let batch = await indexWriter.createBatch(new URL(testRealmURL), {
jobId: 42,
reservationId: 1,
priority: 0,
});
assert.strictEqual(
batch.resumedRows.size,
1,
'resumed rows from prior attempt of same job_id are loaded',
);
assert.strictEqual(
batch.resumedRows.get(url),
lastModified,
'last_modified value is preserved as a number',
);
assert.deepEqual(
batch.invalidations,
[url],
'resumed URLs are pre-seeded into the invalidation set so done() promotes them',
);
});

test('does not resume rows with has_error=true so the retry can re-attempt them', async function (assert) {
let url = `${testRealmURL}errored.json`;
await setupIndex(
adapter,
[{ realm_url: testRealmURL, current_version: 1 }],
{
working: [
{
url,
realm_version: 1,
realm_url: testRealmURL,
type: 'instance',
job_id: 42,
last_modified: '1700000000',
is_deleted: false,
has_error: true,
error_doc: {
message: 'transient',
status: 500,
additionalErrors: [],
},
deps: [],
types: [],
},
],
production: [],
},
);

let batch = await indexWriter.createBatch(new URL(testRealmURL), {
jobId: 42,
reservationId: 1,
priority: 0,
});
assert.strictEqual(
batch.resumedRows.size,
0,
'errored rows are not resumed — the retry exists to re-attempt them',
);
assert.deepEqual(
batch.invalidations,
[],
'errored URLs are not pre-seeded into the invalidation set',
);
});

test('does not resume rows from a different job', async function (assert) {
await setupIndex(
adapter,
[{ realm_url: testRealmURL, current_version: 1 }],
{
working: [
{
url: `${testRealmURL}from-other-job.json`,
realm_version: 1,
realm_url: testRealmURL,
type: 'instance',
job_id: 99,
last_modified: '1700000000',
is_deleted: false,
deps: [],
types: [],
},
],
production: [],
},
);

let batch = await indexWriter.createBatch(new URL(testRealmURL), {
jobId: 42,
reservationId: 1,
priority: 0,
});
assert.strictEqual(
batch.resumedRows.size,
0,
'rows tagged with a different job_id are not resumed',
);
});

test('working rows from another job are visible to dependency-walk queries but not to resumedRows', async function (assert) {
// The cumulative working table is the source of truth for the
// reverse-deps walk in `Batch.invalidate` (via
// `itemsThatReference`). Rows from completed prior batches must
// stay so subsequent jobs can find them. The `job_id` filter in
// `loadResumedRows` is what isolates the *current* job's
// resume-handoff from those rows.
let otherUrl = `${testRealmURL}other-job-row.json`;
await setupIndex(
adapter,
[{ realm_url: testRealmURL, current_version: 1 }],
{
working: [
{
url: otherUrl,
realm_version: 1,
realm_url: testRealmURL,
type: 'instance',
job_id: 99,
last_modified: '1700000000',
is_deleted: false,
deps: [],
types: [],
},
],
production: [],
},
);

let batch = await indexWriter.createBatch(new URL(testRealmURL), {
jobId: 42,
reservationId: 1,
priority: 0,
});
assert.strictEqual(
batch.resumedRows.size,
0,
'rows tagged with a different job_id are NOT in resumedRows',
);
let surviving = await adapter.execute(
'SELECT url FROM boxel_index_working WHERE realm_url = $1',
{ bind: [testRealmURL] },
);
assert.deepEqual(
surviving.map((r) => r.url),
[otherUrl],
'cumulative working state is preserved (it is the source for reverse-deps walks)',
);
});

test('forgetResumedRows drops a URL from resumedRows so future tombstoning is no longer guarded', async function (assert) {
let url = `${testRealmURL}1.json`;
await setupIndex(
adapter,
[{ realm_url: testRealmURL, current_version: 1 }],
{
working: [
{
url,
realm_version: 1,
realm_url: testRealmURL,
type: 'instance',
job_id: 42,
last_modified: '1700000000',
deps: [],
types: [],
},
],
production: [],
},
);

let batch = await indexWriter.createBatch(new URL(testRealmURL), {
jobId: 42,
reservationId: 1,
priority: 0,
});
assert.true(
batch.resumedRows.has(url),
'precondition: row is initially resumed',
);
batch.forgetResumedRows([url]);
assert.false(
batch.resumedRows.has(url),
'after forgetResumedRows the URL is no longer protected',
);
batch.forgetResumedRows([`${testRealmURL}does-not-exist.json`]);
assert.strictEqual(
batch.resumedRows.size,
0,
'forgetResumedRows on a URL that is not present is a no-op',
);
});

test('done() promotes resumed rows even though they were never visited in this attempt', async function (assert) {
let url = `${testRealmURL}1.json`;
let resumedDoc = {
id: url,
type: 'card' as const,
attributes: { name: 'Resumed' },
meta: { adoptsFrom: { module: rri(`./person`), name: 'Person' } },
};
await setupIndex(
adapter,
[{ realm_url: testRealmURL, current_version: 1 }],
{
working: [
{
url,
realm_version: 1,
realm_url: testRealmURL,
type: 'instance',
job_id: 42,
last_modified: '1700000000',
is_deleted: false,
has_error: false,
deps: [],
types: [],
pristine_doc: resumedDoc as LooseCardResource,
search_doc: { name: 'Resumed' },
},
],
production: [],
},
);

let batch = await indexWriter.createBatch(new URL(testRealmURL), {
jobId: 42,
reservationId: 1,
priority: 0,
});
// Note: no updateEntry / invalidate call — simulating a retry that
// discovers all its work was already done by the previous attempt.
await batch.done();

let [promoted] = await adapter.execute(
`SELECT pristine_doc, search_doc FROM boxel_index WHERE url = $1`,
{
bind: [url],
coerceTypes: { pristine_doc: 'JSON', search_doc: 'JSON' },
},
);
assert.deepEqual(
promoted?.pristine_doc,
resumedDoc,
'resumed row was promoted to boxel_index by done()',
);
assert.deepEqual(
promoted?.search_doc,
{ name: 'Resumed' },
'resumed row search_doc landed in boxel_index',
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
exports.shorthands = undefined;

// Stamp the originating job's id on every working-table row so that a
// retry of the same job can identify (and skip) URLs already processed
// by an earlier attempt. The PK is (url, realm_url); job_id is metadata
// only. Nullable so legacy rows, copyFrom paths without JobInfo, and
// non-job test callers can still write.
exports.up = (pgm) => {
pgm.addColumn('boxel_index_working', {
job_id: { type: 'integer' },
});
pgm.createIndex('boxel_index_working', ['realm_url', 'job_id']);
};

exports.down = (pgm) => {
pgm.dropIndex('boxel_index_working', ['realm_url', 'job_id']);
pgm.dropColumn('boxel_index_working', 'job_id');
};
2 changes: 2 additions & 0 deletions packages/realm-server/tests/indexing-event-sink-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function makeRecordingAdapter(): {
async getColumnNames() {
return [];
},
async notify() {},
},
};
}
Expand Down Expand Up @@ -374,6 +375,7 @@ module(basename(__filename), function () {
async getColumnNames() {
return [];
},
async notify() {},
};
let sink = new IndexingEventSink({ flushIntervalMs: 10 });
sink.setAdapter(slowAdapter);
Expand Down
Loading
Loading