Skip to content

Commit 3d2bd66

Browse files
committed
Fix: requeue a run if the DB is unavailable during dequeuing
In the DequeueSystem if the database is unavailable we were dequeuing from Redis and then failing to requeue in the error catcher – this was because the requeuing required DB access. Now if in the `catch` we encounter a DB error we requeue directly using Redis, putting it back in the queue.
1 parent 4093883 commit 3d2bd66

File tree

2 files changed

+147
-11
lines changed

2 files changed

+147
-11
lines changed

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { BillingCache } from "../billingCache.js";
22
import { startSpan } from "@internal/tracing";
3-
import { assertExhaustive } from "@trigger.dev/core";
3+
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
44
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
55
import { placementTag } from "@trigger.dev/core/v3/serverOnly";
66
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
@@ -611,20 +611,24 @@ export class DequeueSystem {
611611
}
612612
);
613613

614-
const run = await prisma.taskRun.findFirst({
615-
where: { id: runId },
616-
include: {
617-
runtimeEnvironment: true,
618-
},
619-
});
614+
// Wrap the Prisma call with tryCatch - if DB is unavailable, we still want to nack via Redis
615+
const [findError, run] = await tryCatch(
616+
prisma.taskRun.findFirst({
617+
where: { id: runId },
618+
include: {
619+
runtimeEnvironment: true,
620+
},
621+
})
622+
);
620623

621-
if (!run) {
622-
//this isn't ideal because we're not creating a snapshot… but we can't do much else
624+
// If DB is unavailable or run not found, just nack directly via Redis
625+
if (findError || !run) {
623626
this.$.logger.error(
624-
"RunEngine.dequeueFromWorkerQueue(): Thrown error, then run not found. Nacking.",
627+
"RunEngine.dequeueFromWorkerQueue(): Failed to find run, nacking directly via Redis",
625628
{
626629
runId,
627630
orgId,
631+
findError,
628632
}
629633
);
630634
await this.$.runQueue.nackMessage({ orgId, messageId: runId });

internal-packages/run-engine/src/engine/tests/dequeuing.test.ts

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import { containerTest } from "@internal/testcontainers";
1+
import { assertNonNullable, containerTest } from "@internal/testcontainers";
22
import { trace } from "@internal/tracing";
33
import { DequeuedMessage } from "@trigger.dev/core/v3";
44
import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic";
55
import { PrismaClientOrTransaction } from "@trigger.dev/database";
66
import { expect } from "vitest";
7+
import { setTimeout } from "node:timers/promises";
78
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
89
import { RunEngine } from "../index.js";
910
import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js";
@@ -79,6 +80,137 @@ describe("RunEngine dequeuing", () => {
7980
await engine.quit();
8081
}
8182
});
83+
84+
containerTest(
85+
"Direct nack after dequeue clears concurrency and allows recovery",
86+
async ({ prisma, redisOptions }) => {
87+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
88+
89+
// Use a short heartbeat timeout so the stalled system recovers the run quickly
90+
const pendingExecutingTimeout = 1000;
91+
92+
const engine = new RunEngine({
93+
prisma,
94+
worker: {
95+
redis: redisOptions,
96+
workers: 1,
97+
tasksPerWorker: 10,
98+
pollIntervalMs: 100,
99+
},
100+
queue: {
101+
redis: redisOptions,
102+
masterQueueConsumersDisabled: true,
103+
processWorkerQueueDebounceMs: 50,
104+
},
105+
runLock: {
106+
redis: redisOptions,
107+
},
108+
machines: {
109+
defaultMachine: "small-1x",
110+
machines: {
111+
"small-1x": {
112+
name: "small-1x" as const,
113+
cpu: 0.5,
114+
memory: 0.5,
115+
centsPerMs: 0.0001,
116+
},
117+
},
118+
baseCostInCents: 0.0005,
119+
},
120+
heartbeatTimeoutsMs: {
121+
PENDING_EXECUTING: pendingExecutingTimeout,
122+
},
123+
tracer: trace.getTracer("test", "0.0.0"),
124+
});
125+
126+
try {
127+
const taskIdentifier = "test-task";
128+
129+
// Setup background worker
130+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
131+
132+
// Trigger a single run
133+
const runs = await triggerRuns({
134+
engine,
135+
environment: authenticatedEnvironment,
136+
taskIdentifier,
137+
prisma,
138+
count: 1,
139+
});
140+
expect(runs.length).toBe(1);
141+
const run = runs[0];
142+
143+
// Process master queue to move run to worker queue
144+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 1);
145+
146+
// Wait for processing
147+
await setTimeout(500);
148+
149+
// Dequeue from worker queue - this puts run in concurrency sets and creates PENDING_EXECUTING snapshot
150+
const dequeued = await engine.dequeueFromWorkerQueue({
151+
consumerId: "test_12345",
152+
workerQueue: "main",
153+
});
154+
expect(dequeued.length).toBe(1);
155+
assertNonNullable(dequeued[0]);
156+
157+
// Verify run is in PENDING_EXECUTING state
158+
const executionDataBefore = await engine.getRunExecutionData({ runId: run.id });
159+
assertNonNullable(executionDataBefore);
160+
expect(executionDataBefore.snapshot.executionStatus).toBe("PENDING_EXECUTING");
161+
162+
// Verify run is in concurrency
163+
const envConcurrencyBefore = await engine.runQueue.currentConcurrencyOfEnvironment(
164+
authenticatedEnvironment
165+
);
166+
expect(envConcurrencyBefore).toBe(1);
167+
168+
// Simulate DB failure fallback: call nackMessage directly via Redis
169+
// This is what happens when the catch block can't read from Postgres
170+
await engine.runQueue.nackMessage({
171+
orgId: authenticatedEnvironment.organization.id,
172+
messageId: run.id,
173+
});
174+
175+
// Verify concurrency is cleared - this is the key fix!
176+
// Without this fix, the run would stay in concurrency sets forever
177+
const envConcurrencyAfter = await engine.runQueue.currentConcurrencyOfEnvironment(
178+
authenticatedEnvironment
179+
);
180+
expect(envConcurrencyAfter).toBe(0);
181+
182+
// Verify the message is back in the queue
183+
const envQueueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment);
184+
expect(envQueueLength).toBe(1);
185+
186+
// Wait for the stalled system to detect and recover the PENDING_EXECUTING run
187+
// The stalled system will call tryNackAndRequeue which updates Postgres state to QUEUED
188+
await setTimeout(pendingExecutingTimeout * 5);
189+
190+
// Verify the stalled system recovered the run to QUEUED state
191+
const executionDataAfterStall = await engine.getRunExecutionData({ runId: run.id });
192+
assertNonNullable(executionDataAfterStall);
193+
expect(executionDataAfterStall.snapshot.executionStatus).toBe("QUEUED");
194+
195+
// Process master queue to move the run from env queue to worker queue
196+
await engine.runQueue.processMasterQueueForEnvironment(authenticatedEnvironment.id, 1);
197+
198+
// Wait for processing
199+
await setTimeout(500);
200+
201+
// Dequeue from worker queue - the run should now be available
202+
const dequeuedAgain = await engine.dequeueFromWorkerQueue({
203+
consumerId: "test_12345",
204+
workerQueue: "main",
205+
});
206+
expect(dequeuedAgain.length).toBe(1);
207+
assertNonNullable(dequeuedAgain[0]);
208+
expect(dequeuedAgain[0].run.id).toBe(run.id);
209+
} finally {
210+
await engine.quit();
211+
}
212+
}
213+
);
82214
});
83215

84216
async function triggerRuns({

0 commit comments

Comments
 (0)