Skip to content

Commit d820896

Browse files
authored
fix(batch): extract the queue name out of an already nested queue option (#2807)
1 parent acc10e8 commit d820896

File tree

2 files changed

+209
-4
lines changed

2 files changed

+209
-4
lines changed

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,35 @@ import { env } from "~/env.server";
1616
import { tryCatch } from "@trigger.dev/core/v3";
1717
import { ServiceValidationError } from "~/v3/services/common.server";
1818

19+
/**
20+
* Extract the queue name from a queue option that may be:
21+
* - An object with a string `name` property: { name: "queue-name" }
22+
* - A double-wrapped object (bug case): { name: { name: "queue-name", ... } }
23+
*
24+
* This handles the case where the SDK accidentally double-wraps the queue
25+
* option when it's already an object with a name property.
26+
*/
27+
function extractQueueName(queue: { name?: unknown } | undefined): string | undefined {
28+
if (!queue?.name) {
29+
return undefined;
30+
}
31+
32+
// Normal case: queue.name is a string
33+
if (typeof queue.name === "string") {
34+
return queue.name;
35+
}
36+
37+
// Double-wrapped case: queue.name is an object with its own name property
38+
if (typeof queue.name === "object" && queue.name !== null && "name" in queue.name) {
39+
const innerName = (queue.name as { name: unknown }).name;
40+
if (typeof innerName === "string") {
41+
return innerName;
42+
}
43+
}
44+
45+
return undefined;
46+
}
47+
1948
export class DefaultQueueManager implements QueueManager {
2049
constructor(
2150
private readonly prisma: PrismaClientOrTransaction,
@@ -32,8 +61,8 @@ export class DefaultQueueManager implements QueueManager {
3261
// Determine queue name based on lockToVersion and provided options
3362
if (lockedBackgroundWorker) {
3463
// Task is locked to a specific worker version
35-
if (request.body.options?.queue?.name) {
36-
const specifiedQueueName = request.body.options.queue.name;
64+
const specifiedQueueName = extractQueueName(request.body.options?.queue);
65+
if (specifiedQueueName) {
3766
// A specific queue name is provided
3867
const specifiedQueue = await this.prisma.taskQueue.findFirst({
3968
// Validate it exists for the locked worker
@@ -126,8 +155,10 @@ export class DefaultQueueManager implements QueueManager {
126155
const { taskId, environment, body } = request;
127156
const { queue } = body.options ?? {};
128157

129-
if (queue?.name) {
130-
return queue.name;
158+
// Use extractQueueName to handle double-wrapped queue objects
159+
const queueName = extractQueueName(queue);
160+
if (queueName) {
161+
return queueName;
131162
}
132163

133164
const defaultQueueName = `task/${taskId}`;

references/hello-world/src/trigger/batches.ts

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,180 @@ export const simpleTask = task({
484484
},
485485
});
486486

487+
// ============================================================================
488+
// Queue Option Tests
489+
// ============================================================================
490+
491+
/**
492+
* Task that runs in a specific queue for testing queue option handling.
493+
*/
494+
export const queuedTask = task({
495+
id: "queued-task",
496+
queue: {
497+
name: "test-queue-for-batch",
498+
},
499+
run: async (payload: { index: number; testId: string }) => {
500+
logger.info(`Processing queued task ${payload.index}`, { payload });
501+
await setTimeout(100);
502+
return {
503+
index: payload.index,
504+
testId: payload.testId,
505+
processedAt: Date.now(),
506+
};
507+
},
508+
});
509+
510+
/**
511+
* Test: Batch trigger with queue option as object
512+
*
513+
* This test verifies that the queue option works correctly when passed
514+
* through batch trigger. The SDK passes queue as { name: "queue-name" }
515+
* which should be handled correctly by the server.
516+
*
517+
* This tests the fix for the double-wrapping bug where queue objects
518+
* like { name: "queue-name", concurrencyLimit: 20 } could get wrapped
519+
* into { name: { name: "queue-name", concurrencyLimit: 20 } }.
520+
*/
521+
export const batchTriggerWithQueueOption = task({
522+
id: "batch-trigger-with-queue-option",
523+
maxDuration: 120,
524+
run: async (payload: { count: number; useObjectQueue?: boolean }) => {
525+
const count = payload.count || 5;
526+
const testId = `queue-test-${Date.now()}`;
527+
528+
// If useObjectQueue is true, we bypass the SDK types to send queue as an object
529+
// This simulates what might happen if someone calls the API directly with wrong format
530+
const queueValue = payload.useObjectQueue
531+
? ({ name: "test-queue-for-batch", concurrencyLimit: 20 } as unknown as string)
532+
: "test-queue-for-batch";
533+
534+
// Generate batch items with queue option specified
535+
const items = Array.from({ length: count }, (_, i) => ({
536+
payload: { index: i, testId },
537+
options: {
538+
queue: queueValue,
539+
// Also test with lockToVersion since the error showed workers.some.id
540+
// which only appears in the lockedBackgroundWorker code path
541+
},
542+
}));
543+
544+
logger.info("Starting batch trigger with queue option", {
545+
count,
546+
testId,
547+
useObjectQueue: payload.useObjectQueue,
548+
queueValue,
549+
});
550+
551+
// Trigger the batch with queue option
552+
const result = await queuedTask.batchTrigger(items);
553+
554+
logger.info("Batch triggered successfully", {
555+
batchId: result.batchId,
556+
runCount: result.runCount,
557+
});
558+
559+
// Wait for runs to complete
560+
await setTimeout(5000);
561+
562+
// Retrieve batch to check results
563+
const batchResult = await batch.retrieve(result.batchId);
564+
565+
return {
566+
success: true,
567+
batchId: result.batchId,
568+
runCount: result.runCount,
569+
batchStatus: batchResult.status,
570+
testId,
571+
};
572+
},
573+
});
574+
575+
/**
576+
* Test: Batch triggerAndWait with queue option
577+
*
578+
* Similar to above but waits for all runs to complete.
579+
*/
580+
export const batchTriggerAndWaitWithQueueOption = task({
581+
id: "batch-trigger-and-wait-with-queue-option",
582+
maxDuration: 120,
583+
run: async (payload: { count: number }) => {
584+
const count = payload.count || 5;
585+
const testId = `queue-wait-test-${Date.now()}`;
586+
587+
// Generate items with queue option
588+
const items = Array.from({ length: count }, (_, i) => ({
589+
payload: { index: i, testId },
590+
options: {
591+
queue: "test-queue-for-batch",
592+
},
593+
}));
594+
595+
logger.info("Starting batch triggerAndWait with queue option", { count, testId });
596+
597+
// Trigger and wait
598+
const results = await queuedTask.batchTriggerAndWait(items);
599+
600+
const successCount = results.runs.filter((r) => r.ok).length;
601+
const outputs = results.runs.filter((r) => r.ok).map((r) => (r.ok ? r.output : null));
602+
603+
return {
604+
success: successCount === count,
605+
successCount,
606+
totalCount: count,
607+
outputs,
608+
testId,
609+
};
610+
},
611+
});
612+
613+
/**
614+
* Test: Streaming batch trigger with queue option
615+
*
616+
* Tests that streaming batches also work correctly with queue options.
617+
*/
618+
export const streamingBatchWithQueueOption = task({
619+
id: "streaming-batch-with-queue-option",
620+
maxDuration: 120,
621+
run: async (payload: { count: number }) => {
622+
const count = payload.count || 10;
623+
const testId = `stream-queue-test-${Date.now()}`;
624+
625+
// Async generator that yields items with queue option
626+
async function* generateItems() {
627+
for (let i = 0; i < count; i++) {
628+
yield {
629+
payload: { index: i, testId },
630+
options: {
631+
queue: "test-queue-for-batch",
632+
},
633+
};
634+
}
635+
}
636+
637+
logger.info("Starting streaming batch with queue option", { count, testId });
638+
639+
// Trigger using the generator
640+
const result = await queuedTask.batchTrigger(generateItems());
641+
642+
logger.info("Streaming batch triggered", {
643+
batchId: result.batchId,
644+
runCount: result.runCount,
645+
});
646+
647+
// Wait and check results
648+
await setTimeout(5000);
649+
const batchResult = await batch.retrieve(result.batchId);
650+
651+
return {
652+
success: true,
653+
batchId: result.batchId,
654+
runCount: result.runCount,
655+
batchStatus: batchResult.status,
656+
testId,
657+
};
658+
},
659+
});
660+
487661
// ============================================================================
488662
// Large Payload Examples (R2 Offloading)
489663
// ============================================================================

0 commit comments

Comments
 (0)