Skip to content

Commit 6055c7d

Browse files
matt-aitkengithub-actions[bot]ericallam
authored
Recover runs that failed to dequeue (#2931)
There’s an edge case that means runs can end up in the currentConcurrency set when they’re not in the correct state for execution. This means they will be permanently stuck in queued. Given an environmentId this will fix those runs. This is a temporary fix while we permanently fix the issue. --------- Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com> Co-authored-by: Eric Allam <ericallam@users.noreply.github.com>
1 parent cf1c311 commit 6055c7d

File tree

1 file changed

+367
-0
lines changed

1 file changed

+367
-0
lines changed

scripts/recover-stuck-runs.ts

Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
#!/usr/bin/env tsx
2+
3+
/**
4+
* Recovery script for runs stuck in currentConcurrency with QUEUED execution status
5+
*
6+
* PROBLEM:
7+
* During high database load, runs can get dequeued from Redis (added to currentConcurrency)
8+
* but fail to update their execution status in the database. This leaves them stuck in an
9+
* inconsistent state where they won't be re-dequeued because they're marked as "in progress"
10+
* in Redis, but their database state still shows QUEUED.
11+
*
12+
* SOLUTION:
13+
* This script identifies and recovers these stuck runs by:
14+
* 1. Reading from the environment currentConcurrency Redis set
15+
* 2. Checking which runs have QUEUED execution status (inconsistent state)
16+
* 3. Re-adding them to their specific queue sorted sets
17+
* 4. Removing them from the queue-specific currentConcurrency sets
18+
* 5. Removing them from the environment-level currentConcurrency set
19+
*
20+
* SAFETY:
21+
* - Dry-run mode when no write Redis URL is provided (read-only, no writes)
22+
* - Uses separate Redis connections for reads and writes
23+
* - Write connection only created when redisWriteUrl is provided
24+
*
25+
* ARGUMENTS:
26+
* <environmentId> The Trigger.dev environment ID (e.g., env_abc123)
27+
* <postgresUrl> PostgreSQL connection string
28+
* <redisReadUrl> Redis connection string for reads (redis:// or rediss://)
29+
* [redisWriteUrl] Optional Redis connection string for writes (omit for dry-run)
30+
*
31+
* USAGE:
32+
* tsx scripts/recover-stuck-runs.ts <environmentId> <postgresUrl> <redisReadUrl> [redisWriteUrl]
33+
*
34+
* EXAMPLES:
35+
*
36+
* Dry-run mode (safe, no writes):
37+
* tsx scripts/recover-stuck-runs.ts env_1234567890 \
38+
* "postgresql://user:pass@localhost:5432/triggerdev" \
39+
* "redis://readonly.example.com:6379"
40+
*
41+
* Execute mode (makes actual changes):
42+
* tsx scripts/recover-stuck-runs.ts env_1234567890 \
43+
* "postgresql://user:pass@localhost:5432/triggerdev" \
44+
* "redis://readonly.example.com:6379" \
45+
* "redis://writeonly.example.com:6379"
46+
*/
47+
48+
import { PrismaClient, TaskRunExecutionStatus } from "@trigger.dev/database";
49+
import { createRedisClient } from "@internal/redis";
50+
51+
interface StuckRun {
52+
runId: string;
53+
orgId: string;
54+
projectId: string;
55+
environmentId: string;
56+
queue: string;
57+
concurrencyKey: string | null;
58+
executionStatus: TaskRunExecutionStatus;
59+
snapshotCreatedAt: Date;
60+
taskIdentifier: string;
61+
}
62+
63+
interface RedisOperation {
64+
type: "ZADD" | "SREM";
65+
key: string;
66+
args: (string | number)[];
67+
description: string;
68+
}
69+
70+
async function main() {
71+
const [environmentId, postgresUrl, redisReadUrl, redisWriteUrl] = process.argv.slice(2);
72+
73+
if (!environmentId || !postgresUrl || !redisReadUrl) {
74+
console.error("Usage: tsx scripts/recover-stuck-runs.ts <environmentId> <postgresUrl> <redisReadUrl> [redisWriteUrl]");
75+
console.error("");
76+
console.error("Dry-run mode when no redisWriteUrl is provided (read-only).");
77+
console.error("Execute mode when redisWriteUrl is provided (makes actual changes).");
78+
console.error("");
79+
console.error("Example (dry-run):");
80+
console.error(' tsx scripts/recover-stuck-runs.ts env_1234567890 \\');
81+
console.error(' "postgresql://user:pass@localhost:5432/triggerdev" \\');
82+
console.error(' "redis://readonly.example.com:6379"');
83+
console.error("");
84+
console.error("Example (execute):");
85+
console.error(' tsx scripts/recover-stuck-runs.ts env_1234567890 \\');
86+
console.error(' "postgresql://user:pass@localhost:5432/triggerdev" \\');
87+
console.error(' "redis://readonly.example.com:6379" \\');
88+
console.error(' "redis://writeonly.example.com:6379"');
89+
process.exit(1);
90+
}
91+
92+
const executeMode = !!redisWriteUrl;
93+
94+
if (executeMode) {
95+
console.log("⚠️ EXECUTE MODE - Changes will be made to Redis\n");
96+
} else {
97+
console.log("🔍 DRY RUN MODE - No changes will be made to Redis\n");
98+
}
99+
100+
console.log(`🔍 Scanning for stuck runs in environment: ${environmentId}`);
101+
102+
// Create Prisma client with the provided connection URL
103+
const prisma = new PrismaClient({
104+
datasources: {
105+
db: {
106+
url: postgresUrl,
107+
},
108+
},
109+
});
110+
111+
try {
112+
// Get environment details
113+
const environment = await prisma.runtimeEnvironment.findUnique({
114+
where: { id: environmentId },
115+
include: {
116+
organization: true,
117+
project: true,
118+
},
119+
});
120+
121+
if (!environment) {
122+
console.error(`❌ Environment not found: ${environmentId}`);
123+
process.exit(1);
124+
}
125+
126+
console.log(`📍 Environment: ${environment.slug} (${environment.type})`);
127+
console.log(`📍 Organization: ${environment.organization.slug}`);
128+
console.log(`📍 Project: ${environment.project.slug}`);
129+
130+
// Parse Redis read URL
131+
const redisReadUrlObj = new URL(redisReadUrl);
132+
const redisReadOptions = {
133+
host: redisReadUrlObj.hostname,
134+
port: parseInt(redisReadUrlObj.port || "6379"),
135+
username: redisReadUrlObj.username || undefined,
136+
password: redisReadUrlObj.password || undefined,
137+
enableAutoPipelining: false,
138+
...(redisReadUrlObj.protocol === "rediss:"
139+
? {
140+
tls: {
141+
// If connecting via localhost tunnel to a remote Redis, disable cert verification
142+
rejectUnauthorized: redisReadUrlObj.hostname === "localhost" ? false : true,
143+
},
144+
}
145+
: {}),
146+
};
147+
148+
// Create Redis read client
149+
const redisRead = createRedisClient(redisReadOptions);
150+
151+
// Create Redis write client if redisWriteUrl is provided
152+
let redisWrite = null;
153+
if (redisWriteUrl) {
154+
const redisWriteUrlObj = new URL(redisWriteUrl);
155+
const redisWriteOptions = {
156+
host: redisWriteUrlObj.hostname,
157+
port: parseInt(redisWriteUrlObj.port || "6379"),
158+
username: redisWriteUrlObj.username || undefined,
159+
password: redisWriteUrlObj.password || undefined,
160+
enableAutoPipelining: false,
161+
...(redisWriteUrlObj.protocol === "rediss:"
162+
? {
163+
tls: {
164+
// If connecting via localhost tunnel to a remote Redis, disable cert verification
165+
rejectUnauthorized: redisWriteUrlObj.hostname === "localhost" ? false : true,
166+
},
167+
}
168+
: {}),
169+
};
170+
redisWrite = createRedisClient(redisWriteOptions);
171+
}
172+
173+
try {
174+
// Build the Redis key for environment-level currentConcurrency set
175+
// Format: engine:runqueue:{org:X}:proj:Y:env:Z:currentConcurrency
176+
const envConcurrencyKey = `engine:runqueue:{org:${environment.organizationId}}:proj:${environment.projectId}:env:${environmentId}:currentConcurrency`;
177+
178+
console.log(`\n🔑 Checking Redis key: ${envConcurrencyKey}`);
179+
180+
// Get all run IDs in the environment's currentConcurrency set
181+
const runIds = await redisRead.smembers(envConcurrencyKey);
182+
183+
if (runIds.length === 0) {
184+
console.log(`✅ No runs in currentConcurrency set`);
185+
return;
186+
}
187+
188+
console.log(`📊 Found ${runIds.length} runs in currentConcurrency set`);
189+
190+
// Query database for latest snapshots and queue info of these runs
191+
const runInfo = await prisma.$queryRaw<
192+
Array<{
193+
runId: string;
194+
executionStatus: TaskRunExecutionStatus;
195+
snapshotCreatedAt: Date;
196+
organizationId: string;
197+
projectId: string;
198+
environmentId: string;
199+
taskIdentifier: string;
200+
queue: string;
201+
concurrencyKey: string | null;
202+
}>
203+
>`
204+
SELECT DISTINCT ON (s."runId")
205+
s."runId",
206+
s."executionStatus",
207+
s."createdAt" as "snapshotCreatedAt",
208+
r."organizationId",
209+
r."projectId",
210+
r."runtimeEnvironmentId" as "environmentId",
211+
r."taskIdentifier",
212+
r."queue",
213+
r."concurrencyKey"
214+
FROM "TaskRunExecutionSnapshot" s
215+
INNER JOIN "TaskRun" r ON r.id = s."runId"
216+
WHERE s."runId" = ANY(${runIds})
217+
AND s."isValid" = true
218+
ORDER BY s."runId", s."createdAt" DESC
219+
`;
220+
221+
const stuckRuns: StuckRun[] = [];
222+
223+
// Find runs with QUEUED execution status (inconsistent state)
224+
for (const info of runInfo) {
225+
if (info.executionStatus === "QUEUED") {
226+
stuckRuns.push({
227+
runId: info.runId,
228+
orgId: info.organizationId,
229+
projectId: info.projectId,
230+
environmentId: info.environmentId,
231+
queue: info.queue,
232+
concurrencyKey: info.concurrencyKey,
233+
executionStatus: info.executionStatus,
234+
snapshotCreatedAt: info.snapshotCreatedAt,
235+
taskIdentifier: info.taskIdentifier,
236+
});
237+
}
238+
}
239+
240+
if (stuckRuns.length === 0) {
241+
console.log(`✅ No stuck runs found (all runs have progressed beyond QUEUED state)`);
242+
return;
243+
}
244+
245+
console.log(`\n⚠️ Found ${stuckRuns.length} stuck runs in QUEUED state:`);
246+
console.log(`════════════════════════════════════════════════════════════════`);
247+
248+
for (const run of stuckRuns) {
249+
const age = Date.now() - run.snapshotCreatedAt.getTime();
250+
const ageMinutes = Math.floor(age / 1000 / 60);
251+
console.log(` • Run: ${run.runId}`);
252+
console.log(` Task: ${run.taskIdentifier}`);
253+
console.log(` Queue: ${run.queue}`);
254+
console.log(` Concurrency Key: ${run.concurrencyKey || "(none)"}`);
255+
console.log(` Status: ${run.executionStatus}`);
256+
console.log(` Stuck for: ${ageMinutes} minutes`);
257+
console.log(` Snapshot created: ${run.snapshotCreatedAt.toISOString()}`);
258+
console.log();
259+
}
260+
261+
// Prepare recovery operations
262+
console.log(`\n⚡ ${executeMode ? "Executing" : "Planning"} recovery for ${stuckRuns.length} stuck runs`);
263+
console.log(`This will:`);
264+
console.log(` 1. Add each run back to its specific queue sorted set`);
265+
console.log(` 2. Remove each run from the queue-specific currentConcurrency set`);
266+
console.log(` 3. Remove each run from the env-level currentConcurrency set`);
267+
console.log();
268+
269+
let successCount = 0;
270+
let failureCount = 0;
271+
272+
const currentTimestamp = Date.now();
273+
274+
for (const run of stuckRuns) {
275+
try {
276+
// Build queue key: engine:runqueue:{org:X}:proj:Y:env:Z:queue:QUEUENAME
277+
// Build queue currentConcurrency key: engine:runqueue:{org:X}:proj:Y:env:Z:queue:QUEUENAME:currentConcurrency
278+
const queueKey = run.concurrencyKey
279+
? `engine:runqueue:{org:${run.orgId}}:proj:${run.projectId}:env:${run.environmentId}:queue:${run.queue}:ck:${run.concurrencyKey}`
280+
: `engine:runqueue:{org:${run.orgId}}:proj:${run.projectId}:env:${run.environmentId}:queue:${run.queue}`;
281+
282+
const queueConcurrencyKey = `${queueKey}:currentConcurrency`;
283+
284+
const operations: RedisOperation[] = [
285+
{
286+
type: "ZADD",
287+
key: queueKey,
288+
args: [currentTimestamp, run.runId],
289+
description: `Add run to queue sorted set with score ${currentTimestamp}`,
290+
},
291+
{
292+
type: "SREM",
293+
key: queueConcurrencyKey,
294+
args: [run.runId],
295+
description: `Remove run from queue currentConcurrency set`,
296+
},
297+
{
298+
type: "SREM",
299+
key: envConcurrencyKey,
300+
args: [run.runId],
301+
description: `Remove run from env currentConcurrency set`,
302+
},
303+
];
304+
305+
if (executeMode && redisWrite) {
306+
// Execute operations using the write client
307+
await redisWrite.zadd(queueKey, currentTimestamp, run.runId);
308+
const removedFromQueue = await redisWrite.srem(queueConcurrencyKey, run.runId);
309+
const removedFromEnv = await redisWrite.srem(envConcurrencyKey, run.runId);
310+
311+
console.log(` ✓ Recovered run ${run.runId} (${run.taskIdentifier})`);
312+
if (removedFromQueue === 0) {
313+
console.log(` ⚠ Run was not in queue currentConcurrency set`);
314+
}
315+
if (removedFromEnv === 0) {
316+
console.log(` ⚠ Run was not in env currentConcurrency set`);
317+
}
318+
successCount++;
319+
} else {
320+
// Dry run - just show what would be done
321+
console.log(` 📝 Would recover run ${run.runId} (${run.taskIdentifier}):`);
322+
for (const op of operations) {
323+
console.log(` ${op.type} ${op.key}`);
324+
console.log(` Args: ${JSON.stringify(op.args)}`);
325+
console.log(` (${op.description})`);
326+
}
327+
successCount++;
328+
}
329+
} catch (error) {
330+
console.error(` ✗ Failed to recover run ${run.runId}:`, error);
331+
failureCount++;
332+
}
333+
}
334+
335+
console.log(`\n═══════════════════════════════════════════════════════════════`);
336+
if (executeMode) {
337+
console.log(`✅ Recovery complete!`);
338+
console.log(` Recovered: ${successCount}`);
339+
console.log(` Failed: ${failureCount}`);
340+
console.log();
341+
console.log(`ℹ️ Note: The recovered runs should be automatically dequeued`);
342+
console.log(` by the master queue consumers within a few seconds.`);
343+
} else {
344+
console.log(`📋 Dry run complete - no changes were made`);
345+
console.log(` Would recover: ${successCount}`);
346+
console.log(` Would fail: ${failureCount}`);
347+
console.log();
348+
console.log(`💡 To execute these changes, run again with a redisWriteUrl argument`);
349+
}
350+
} finally {
351+
await redisRead.quit();
352+
if (redisWrite) {
353+
await redisWrite.quit();
354+
}
355+
}
356+
} catch (error) {
357+
console.error("❌ Error during recovery:", error);
358+
throw error;
359+
} finally {
360+
await prisma.$disconnect();
361+
}
362+
}
363+
364+
main().catch((error) => {
365+
console.error("Fatal error:", error);
366+
process.exit(1);
367+
});

0 commit comments

Comments
 (0)