Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-batch-waitpoint-lock-contention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Reduce lock contention when processing large `batchTriggerAndWait` batches. Previously, each batch item acquired a Redis lock on the parent run to insert a `TaskRunWaitpoint` row, causing `LockAcquisitionTimeoutError` with high concurrency (880 errors/24h in prod). Since `blockRunWithCreatedBatch` already transitions the parent to `EXECUTING_WITH_WAITPOINTS` before items are processed, the per-item lock is unnecessary. The new `blockRunWithWaitpointLockless` method performs only the idempotent CTE insert without acquiring the lock.
37 changes: 26 additions & 11 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -728,17 +728,32 @@ export class RunEngine {

//triggerAndWait or batchTriggerAndWait
if (resumeParentOnCompletion && parentTaskRunId && taskRun.associatedWaitpoint) {
//this will block the parent run from continuing until this waitpoint is completed (and removed)
await this.waitpointSystem.blockRunWithWaitpoint({
runId: parentTaskRunId,
waitpoints: taskRun.associatedWaitpoint.id,
projectId: taskRun.associatedWaitpoint.projectId,
organizationId: environment.organization.id,
batch,
workerId,
runnerId,
tx: prisma,
});
if (batch) {
// Batch path: lockless insert. The parent is already EXECUTING_WITH_WAITPOINTS
// from blockRunWithCreatedBatch, so we only need to insert the TaskRunWaitpoint
// row without acquiring the parent run lock. This avoids lock contention when
// processing large batches with high concurrency.
await this.waitpointSystem.blockRunWithWaitpointLockless({
runId: parentTaskRunId,
waitpoints: taskRun.associatedWaitpoint.id,
projectId: taskRun.associatedWaitpoint.projectId,
batch,
tx: prisma,
});
} else {
// Single triggerAndWait: acquire the parent run lock to safely transition
// the snapshot and insert the waitpoint
await this.waitpointSystem.blockRunWithWaitpoint({
runId: parentTaskRunId,
waitpoints: taskRun.associatedWaitpoint.id,
projectId: taskRun.associatedWaitpoint.projectId,
organizationId: environment.organization.id,
batch,
workerId,
runnerId,
tx: prisma,
});
}
}

if (taskRun.delayUntil) {
Expand Down
79 changes: 79 additions & 0 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,85 @@ export class WaitpointSystem {
});
}

/**
* Lockless version of blockRunWithWaitpoint for batch item processing.
*
* When processing batchTriggerAndWait items, blockRunWithCreatedBatch has already
* transitioned the parent run to EXECUTING_WITH_WAITPOINTS before any items are
* processed. Per-item calls to blockRunWithWaitpoint would all compete for the same
* parent run lock just to insert a TaskRunWaitpoint row — causing lock contention
* and LockAcquisitionTimeoutError with large batches.
*
* This method performs only the CTE insert (which is idempotent via ON CONFLICT DO
* NOTHING) and timeout scheduling, without acquiring the parent run lock.
*/
async blockRunWithWaitpointLockless({
runId,
waitpoints,
projectId,
timeout,
spanIdToComplete,
batch,
tx,
}: {
runId: string;
waitpoints: string | string[];
projectId: string;
timeout?: Date;
spanIdToComplete?: string;
batch: { id: string; index?: number };
tx?: PrismaClientOrTransaction;
}): Promise<void> {
const prisma = tx ?? this.$.prisma;
const $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints;

// Insert the blocking connections and the historical run connections.
// No lock needed: ON CONFLICT DO NOTHING makes concurrent inserts safe,
// and the parent snapshot is already EXECUTING_WITH_WAITPOINTS from
// blockRunWithCreatedBatch.
await prisma.$queryRaw`
WITH inserted AS (
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
SELECT
gen_random_uuid(),
${runId},
w.id,
${projectId},
NOW(),
NOW(),
${spanIdToComplete ?? null},
${batch.id},
${batch.index ?? null}
FROM "Waitpoint" w
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
RETURNING "waitpointId"
),
connected_runs AS (
INSERT INTO "_WaitpointRunConnections" ("A", "B")
SELECT ${runId}, w.id
FROM "Waitpoint" w
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
)
SELECT COUNT(*) FROM inserted`;

// Schedule timeout jobs if needed
if (timeout) {
for (const waitpoint of $waitpoints) {
await this.$.worker.enqueue({
id: `finishWaitpoint.${waitpoint}`,
job: "finishWaitpoint",
payload: {
waitpointId: waitpoint,
error: JSON.stringify(timeoutError(timeout)),
},
availableAt: timeout,
});
}
}
}

/**
* Blocks a run with a waitpoint and immediately completes the waitpoint.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { trace } from "@internal/tracing";
import { Logger } from "@trigger.dev/core/logger";

describe("RunLocker", () => {
redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => {
redisTest("Test acquiring a lock works", { timeout: 60_000 }, async ({ redisOptions }) => {
const redis = createRedisClient(redisOptions);
const logger = new Logger("RunLockTest", "debug");
const runLock = new RunLocker({
Expand Down
Loading