Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,35 @@ import { env } from "~/env.server";
import { tryCatch } from "@trigger.dev/core/v3";
import { ServiceValidationError } from "~/v3/services/common.server";

/**
* Extract the queue name from a queue option that may be:
* - An object with a string `name` property: { name: "queue-name" }
* - A double-wrapped object (bug case): { name: { name: "queue-name", ... } }
*
* This handles the case where the SDK accidentally double-wraps the queue
* option when it's already an object with a name property.
*/
function extractQueueName(queue: { name?: unknown } | undefined): string | undefined {
if (!queue?.name) {
return undefined;
}

// Normal case: queue.name is a string
if (typeof queue.name === "string") {
return queue.name;
}

// Double-wrapped case: queue.name is an object with its own name property
if (typeof queue.name === "object" && queue.name !== null && "name" in queue.name) {
const innerName = (queue.name as { name: unknown }).name;
if (typeof innerName === "string") {
return innerName;
}
}

return undefined;
}

export class DefaultQueueManager implements QueueManager {
constructor(
private readonly prisma: PrismaClientOrTransaction,
Expand All @@ -32,8 +61,8 @@ export class DefaultQueueManager implements QueueManager {
// Determine queue name based on lockToVersion and provided options
if (lockedBackgroundWorker) {
// Task is locked to a specific worker version
if (request.body.options?.queue?.name) {
const specifiedQueueName = request.body.options.queue.name;
const specifiedQueueName = extractQueueName(request.body.options?.queue);
if (specifiedQueueName) {
// A specific queue name is provided
const specifiedQueue = await this.prisma.taskQueue.findFirst({
// Validate it exists for the locked worker
Expand Down Expand Up @@ -126,8 +155,10 @@ export class DefaultQueueManager implements QueueManager {
const { taskId, environment, body } = request;
const { queue } = body.options ?? {};

if (queue?.name) {
return queue.name;
// Use extractQueueName to handle double-wrapped queue objects
const queueName = extractQueueName(queue);
if (queueName) {
return queueName;
}

const defaultQueueName = `task/${taskId}`;
Expand Down
174 changes: 174 additions & 0 deletions references/hello-world/src/trigger/batches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,180 @@ export const simpleTask = task({
},
});

// ============================================================================
// Queue Option Tests
// ============================================================================

/**
* Task that runs in a specific queue for testing queue option handling.
*/
export const queuedTask = task({
id: "queued-task",
queue: {
name: "test-queue-for-batch",
},
run: async (payload: { index: number; testId: string }) => {
logger.info(`Processing queued task ${payload.index}`, { payload });
await setTimeout(100);
return {
index: payload.index,
testId: payload.testId,
processedAt: Date.now(),
};
},
});

/**
* Test: Batch trigger with queue option as object
*
* This test verifies that the queue option works correctly when passed
* through batch trigger. The SDK passes queue as { name: "queue-name" }
* which should be handled correctly by the server.
*
* This tests the fix for the double-wrapping bug where queue objects
* like { name: "queue-name", concurrencyLimit: 20 } could get wrapped
* into { name: { name: "queue-name", concurrencyLimit: 20 } }.
*/
export const batchTriggerWithQueueOption = task({
id: "batch-trigger-with-queue-option",
maxDuration: 120,
run: async (payload: { count: number; useObjectQueue?: boolean }) => {
const count = payload.count || 5;
const testId = `queue-test-${Date.now()}`;

// If useObjectQueue is true, we bypass the SDK types to send queue as an object
// This simulates what might happen if someone calls the API directly with wrong format
const queueValue = payload.useObjectQueue
? ({ name: "test-queue-for-batch", concurrencyLimit: 20 } as unknown as string)
: "test-queue-for-batch";

// Generate batch items with queue option specified
const items = Array.from({ length: count }, (_, i) => ({
payload: { index: i, testId },
options: {
queue: queueValue,
// Also test with lockToVersion since the error showed workers.some.id
// which only appears in the lockedBackgroundWorker code path
},
}));

logger.info("Starting batch trigger with queue option", {
count,
testId,
useObjectQueue: payload.useObjectQueue,
queueValue,
});

// Trigger the batch with queue option
const result = await queuedTask.batchTrigger(items);

logger.info("Batch triggered successfully", {
batchId: result.batchId,
runCount: result.runCount,
});

// Wait for runs to complete
await setTimeout(5000);

// Retrieve batch to check results
const batchResult = await batch.retrieve(result.batchId);

return {
success: true,
batchId: result.batchId,
runCount: result.runCount,
batchStatus: batchResult.status,
testId,
};
},
});

/**
* Test: Batch triggerAndWait with queue option
*
* Similar to above but waits for all runs to complete.
*/
export const batchTriggerAndWaitWithQueueOption = task({
id: "batch-trigger-and-wait-with-queue-option",
maxDuration: 120,
run: async (payload: { count: number }) => {
const count = payload.count || 5;
const testId = `queue-wait-test-${Date.now()}`;

// Generate items with queue option
const items = Array.from({ length: count }, (_, i) => ({
payload: { index: i, testId },
options: {
queue: "test-queue-for-batch",
},
}));

logger.info("Starting batch triggerAndWait with queue option", { count, testId });

// Trigger and wait
const results = await queuedTask.batchTriggerAndWait(items);

const successCount = results.runs.filter((r) => r.ok).length;
const outputs = results.runs.filter((r) => r.ok).map((r) => (r.ok ? r.output : null));

return {
success: successCount === count,
successCount,
totalCount: count,
outputs,
testId,
};
},
});

/**
* Test: Streaming batch trigger with queue option
*
* Tests that streaming batches also work correctly with queue options.
*/
export const streamingBatchWithQueueOption = task({
id: "streaming-batch-with-queue-option",
maxDuration: 120,
run: async (payload: { count: number }) => {
const count = payload.count || 10;
const testId = `stream-queue-test-${Date.now()}`;

// Async generator that yields items with queue option
async function* generateItems() {
for (let i = 0; i < count; i++) {
yield {
payload: { index: i, testId },
options: {
queue: "test-queue-for-batch",
},
};
}
}

logger.info("Starting streaming batch with queue option", { count, testId });

// Trigger using the generator
const result = await queuedTask.batchTrigger(generateItems());

logger.info("Streaming batch triggered", {
batchId: result.batchId,
runCount: result.runCount,
});

// Wait and check results
await setTimeout(5000);
const batchResult = await batch.retrieve(result.batchId);

return {
success: true,
batchId: result.batchId,
runCount: result.runCount,
batchStatus: batchResult.status,
testId,
};
},
});

// ============================================================================
// Large Payload Examples (R2 Offloading)
// ============================================================================
Expand Down