Skip to content

Commit 34c8400

Browse files
committed
fix(engine): lockless waitpoint insert for batch items to eliminate lock contention
When processing batchTriggerAndWait items, each batch item was acquiring a Redis lock on the parent run to insert a TaskRunWaitpoint row. With high concurrency (processingConcurrency=50), this caused LockAcquisitionTimeoutError (880 errors/24h in prod), orphaned runs, and stuck parent runs. Since blockRunWithCreatedBatch already transitions the parent to EXECUTING_WITH_WAITPOINTS before items are processed, the per-item lock is unnecessary. The new blockRunWithWaitpointLockless method performs only the idempotent CTE insert and timeout scheduling without acquiring the lock. refs TRI-7837
1 parent dbbe9f7 commit 34c8400

File tree

3 files changed

+111
-11
lines changed

3 files changed

+111
-11
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Reduce lock contention when processing large `batchTriggerAndWait` batches. Previously, each batch item acquired a Redis lock on the parent run to insert a `TaskRunWaitpoint` row, causing `LockAcquisitionTimeoutError` with high concurrency (880 errors/24h in prod). Since `blockRunWithCreatedBatch` already transitions the parent to `EXECUTING_WITH_WAITPOINTS` before items are processed, the per-item lock is unnecessary. The new `blockRunWithWaitpointLockless` method performs only the idempotent CTE insert without acquiring the lock.

internal-packages/run-engine/src/engine/index.ts

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -728,17 +728,32 @@ export class RunEngine {
728728

729729
//triggerAndWait or batchTriggerAndWait
730730
if (resumeParentOnCompletion && parentTaskRunId && taskRun.associatedWaitpoint) {
731-
//this will block the parent run from continuing until this waitpoint is completed (and removed)
732-
await this.waitpointSystem.blockRunWithWaitpoint({
733-
runId: parentTaskRunId,
734-
waitpoints: taskRun.associatedWaitpoint.id,
735-
projectId: taskRun.associatedWaitpoint.projectId,
736-
organizationId: environment.organization.id,
737-
batch,
738-
workerId,
739-
runnerId,
740-
tx: prisma,
741-
});
731+
if (batch) {
732+
// Batch path: lockless insert. The parent is already EXECUTING_WITH_WAITPOINTS
733+
// from blockRunWithCreatedBatch, so we only need to insert the TaskRunWaitpoint
734+
// row without acquiring the parent run lock. This avoids lock contention when
735+
// processing large batches with high concurrency.
736+
await this.waitpointSystem.blockRunWithWaitpointLockless({
737+
runId: parentTaskRunId,
738+
waitpoints: taskRun.associatedWaitpoint.id,
739+
projectId: taskRun.associatedWaitpoint.projectId,
740+
batch,
741+
tx: prisma,
742+
});
743+
} else {
744+
// Single triggerAndWait: acquire the parent run lock to safely transition
745+
// the snapshot and insert the waitpoint
746+
await this.waitpointSystem.blockRunWithWaitpoint({
747+
runId: parentTaskRunId,
748+
waitpoints: taskRun.associatedWaitpoint.id,
749+
projectId: taskRun.associatedWaitpoint.projectId,
750+
organizationId: environment.organization.id,
751+
batch,
752+
workerId,
753+
runnerId,
754+
tx: prisma,
755+
});
756+
}
742757
}
743758

744759
if (taskRun.delayUntil) {

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,85 @@ export class WaitpointSystem {
525525
});
526526
}
527527

528+
/**
529+
* Lockless version of blockRunWithWaitpoint for batch item processing.
530+
*
531+
* When processing batchTriggerAndWait items, blockRunWithCreatedBatch has already
532+
* transitioned the parent run to EXECUTING_WITH_WAITPOINTS before any items are
533+
* processed. Per-item calls to blockRunWithWaitpoint would all compete for the same
534+
* parent run lock just to insert a TaskRunWaitpoint row — causing lock contention
535+
* and LockAcquisitionTimeoutError with large batches.
536+
*
537+
* This method performs only the CTE insert (which is idempotent via ON CONFLICT DO
538+
* NOTHING) and timeout scheduling, without acquiring the parent run lock.
539+
*/
540+
async blockRunWithWaitpointLockless({
541+
runId,
542+
waitpoints,
543+
projectId,
544+
timeout,
545+
spanIdToComplete,
546+
batch,
547+
tx,
548+
}: {
549+
runId: string;
550+
waitpoints: string | string[];
551+
projectId: string;
552+
timeout?: Date;
553+
spanIdToComplete?: string;
554+
batch: { id: string; index?: number };
555+
tx?: PrismaClientOrTransaction;
556+
}): Promise<void> {
557+
const prisma = tx ?? this.$.prisma;
558+
const $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints;
559+
560+
// Insert the blocking connections and the historical run connections.
561+
// No lock needed: ON CONFLICT DO NOTHING makes concurrent inserts safe,
562+
// and the parent snapshot is already EXECUTING_WITH_WAITPOINTS from
563+
// blockRunWithCreatedBatch.
564+
await prisma.$queryRaw`
565+
WITH inserted AS (
566+
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
567+
SELECT
568+
gen_random_uuid(),
569+
${runId},
570+
w.id,
571+
${projectId},
572+
NOW(),
573+
NOW(),
574+
${spanIdToComplete ?? null},
575+
${batch.id},
576+
${batch.index ?? null}
577+
FROM "Waitpoint" w
578+
WHERE w.id IN (${Prisma.join($waitpoints)})
579+
ON CONFLICT DO NOTHING
580+
RETURNING "waitpointId"
581+
),
582+
connected_runs AS (
583+
INSERT INTO "_WaitpointRunConnections" ("A", "B")
584+
SELECT ${runId}, w.id
585+
FROM "Waitpoint" w
586+
WHERE w.id IN (${Prisma.join($waitpoints)})
587+
ON CONFLICT DO NOTHING
588+
)
589+
SELECT COUNT(*) FROM inserted`;
590+
591+
// Schedule timeout jobs if needed
592+
if (timeout) {
593+
for (const waitpoint of $waitpoints) {
594+
await this.$.worker.enqueue({
595+
id: `finishWaitpoint.${waitpoint}`,
596+
job: "finishWaitpoint",
597+
payload: {
598+
waitpointId: waitpoint,
599+
error: JSON.stringify(timeoutError(timeout)),
600+
},
601+
availableAt: timeout,
602+
});
603+
}
604+
}
605+
}
606+
528607
/**
529608
* Blocks a run with a waitpoint and immediately completes the waitpoint.
530609
*

0 commit comments

Comments
 (0)