Skip to content

Commit ef76ff7

Browse files
committed
restructure the batch queue callbacks to prevent circular import
1 parent 342c9fc commit ef76ff7

File tree

8 files changed

+221
-265
lines changed

8 files changed

+221
-265
lines changed

apps/webapp/app/entry.server.tsx

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
1-
import {
2-
createReadableStreamFromReadable,
3-
type DataFunctionArgs,
4-
type EntryContext,
5-
} from "@remix-run/node"; // or cloudflare/deno
1+
import { createReadableStreamFromReadable, type EntryContext } from "@remix-run/node"; // or cloudflare/deno
62
import { RemixServer } from "@remix-run/react";
3+
import { wrapHandleErrorWithSentry } from "@sentry/remix";
74
import { parseAcceptLanguage } from "intl-parse-accept-language";
85
import isbot from "isbot";
96
import { renderToPipeableStream } from "react-dom/server";
107
import { PassThrough } from "stream";
118
import * as Worker from "~/services/worker.server";
9+
import { bootstrap } from "./bootstrap";
1210
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1311
import {
1412
OperatingSystemContextProvider,
1513
OperatingSystemPlatform,
1614
} from "./components/primitives/OperatingSystemProvider";
15+
import { Prisma } from "./db.server";
16+
import { env } from "./env.server";
17+
import { eventLoopMonitor } from "./eventLoopMonitor.server";
18+
import { logger } from "./services/logger.server";
19+
import { resourceMonitor } from "./services/resourceMonitor.server";
1720
import { singleton } from "./utils/singleton";
18-
import { bootstrap } from "./bootstrap";
19-
import { wrapHandleErrorWithSentry } from "@sentry/remix";
21+
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
22+
import {
23+
registerRunEngineEventBusHandlers,
24+
setupBatchQueueCallbacks,
25+
} from "./v3/runEngineHandlers.server";
2026

2127
const ABORT_DELAY = 30000;
2228

@@ -228,19 +234,13 @@ process.on("uncaughtException", (error, origin) => {
228234
});
229235

230236
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
237+
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
231238

232239
export { apiRateLimiter } from "./services/apiRateLimit.server";
233240
export { engineRateLimiter } from "./services/engineRateLimit.server";
241+
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
234242
export { socketIo } from "./v3/handleSocketIo.server";
235243
export { wss } from "./v3/handleWebsockets.server";
236-
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
237-
import { eventLoopMonitor } from "./eventLoopMonitor.server";
238-
import { env } from "./env.server";
239-
import { logger } from "./services/logger.server";
240-
import { Prisma } from "./db.server";
241-
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
242-
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
243-
import { resourceMonitor } from "./services/resourceMonitor.server";
244244

245245
if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
246246
eventLoopMonitor.enable();

apps/webapp/app/routes/api.v3.batches.$batchId.items.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,6 @@ export async function action({ request, params }: ActionFunctionArgs) {
5858
return json({ error: authResult.error }, { status: 401 });
5959
}
6060

61-
// Verify BatchQueue is enabled
62-
if (!engine.isBatchQueueEnabled()) {
63-
return json(
64-
{
65-
error: "Streaming batch API is not available. BatchQueue is not enabled.",
66-
},
67-
{ status: 503 }
68-
);
69-
}
70-
7161
// Get the request body stream
7262
const body = request.body;
7363
if (!body) {

apps/webapp/app/routes/api.v3.batches.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,6 @@ const { action, loader } = createActionApiRoute(
5959
);
6060
}
6161

62-
// Verify BatchQueue is enabled
63-
if (!engine.isBatchQueueEnabled()) {
64-
return json(
65-
{
66-
error: "Streaming batch API is not available. BatchQueue is not enabled.",
67-
},
68-
{ status: 503 }
69-
);
70-
}
71-
7262
const {
7363
"trigger-version": triggerVersion,
7464
"x-trigger-span-parent-as-link": spanParentAsLink,

apps/webapp/app/runEngine/services/createBatch.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1+
import type { InitializeBatchOptions } from "@internal/run-engine";
12
import { type CreateBatchRequestBody, type CreateBatchResponse } from "@trigger.dev/core/v3";
23
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
34
import { type BatchTaskRun, Prisma } from "@trigger.dev/database";
4-
import type { InitializeBatchOptions } from "@internal/run-engine";
55
import { Evt } from "evt";
66
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
77
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
88
import { logger } from "~/services/logger.server";
9-
import { DefaultQueueManager } from "../concerns/queues.server";
10-
import { DefaultTriggerTaskValidator } from "../validators/triggerTaskValidator";
119
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
1210
import { BatchRateLimitExceededError, getBatchLimits } from "../concerns/batchLimits.server";
11+
import { DefaultQueueManager } from "../concerns/queues.server";
12+
import { DefaultTriggerTaskValidator } from "../validators/triggerTaskValidator";
1313

1414
export type CreateBatchServiceOptions = {
1515
triggerVersion?: string;
Lines changed: 2 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
1-
import { RunEngine, type CompleteBatchResult } from "@internal/run-engine";
2-
import { BatchTaskRunStatus, Prisma } from "@trigger.dev/database";
3-
import { SpanKind } from "@opentelemetry/api";
1+
import { RunEngine } from "@internal/run-engine";
42
import { $replica, prisma } from "~/db.server";
53
import { env } from "~/env.server";
64
import { createBatchGlobalRateLimiter } from "~/runEngine/concerns/batchGlobalRateLimiter.server";
5+
import { logger } from "~/services/logger.server";
76
import { defaultMachine, getCurrentPlan } from "~/services/platform.v3.server";
87
import { singleton } from "~/utils/singleton";
98
import { allMachines } from "./machinePresets.server";
109
import { meter, tracer } from "./tracer.server";
11-
import { logger } from "~/services/logger.server";
12-
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
13-
import { TriggerTaskService } from "./services/triggerTask.server";
1410

1511
export const engine = singleton("RunEngine", createRunEngine);
1612

@@ -188,202 +184,5 @@ function createRunEngine() {
188184
},
189185
});
190186

191-
// Set up BatchQueue callbacks if enabled
192-
if (engine.isBatchQueueEnabled()) {
193-
setupBatchQueueCallbacks(engine);
194-
}
195-
196187
return engine;
197188
}
198-
199-
/**
200-
* Normalize the payload from BatchQueue.
201-
*
202-
* Handles different payload types:
203-
* - "application/store": Already offloaded to R2, payload is the path - pass through as-is
204-
* - "application/json": May be a pre-serialized JSON string - parse to avoid double-stringification
205-
* - Other types: Pass through as-is
206-
*
207-
* @param payload - The raw payload from the batch item
208-
* @param payloadType - The payload type (e.g., "application/json", "application/store")
209-
*/
210-
function normalizePayload(payload: unknown, payloadType?: string): unknown {
211-
// For non-JSON payloads (including application/store for R2-offloaded payloads),
212-
// return as-is - no normalization needed
213-
if (payloadType !== "application/json" && payloadType !== undefined) {
214-
return payload;
215-
}
216-
217-
// For JSON payloads, if payload is a string, try to parse it
218-
// This handles pre-serialized JSON from the SDK
219-
if (typeof payload === "string") {
220-
try {
221-
return JSON.parse(payload);
222-
} catch {
223-
// If it's not valid JSON, return as-is
224-
return payload;
225-
}
226-
}
227-
228-
return payload;
229-
}
230-
231-
/**
232-
* Set up the BatchQueue processing callbacks.
233-
* These handle creating runs from batch items and completing batches.
234-
*
235-
* Payload handling:
236-
* - If payloadType is "application/store", the payload is an R2 path (already offloaded)
237-
* - DefaultPayloadProcessor in TriggerTaskService will pass it through without re-offloading
238-
* - The run engine will download from R2 when the task executes
239-
*/
240-
function setupBatchQueueCallbacks(engine: RunEngine) {
241-
// Item processing callback - creates a run for each batch item
242-
engine.setBatchProcessItemCallback(async ({ batchId, friendlyId, itemIndex, item, meta }) => {
243-
return tracer.startActiveSpan(
244-
"batch.processItem",
245-
{
246-
kind: SpanKind.INTERNAL,
247-
attributes: {
248-
"batch.id": friendlyId,
249-
"batch.item_index": itemIndex,
250-
"batch.task": item.task,
251-
"batch.environment_id": meta.environmentId,
252-
"batch.parent_run_id": meta.parentRunId ?? "",
253-
},
254-
},
255-
async (span) => {
256-
try {
257-
const triggerTaskService = new TriggerTaskService();
258-
259-
// Normalize payload - for application/store (R2 paths), this passes through as-is
260-
const payload = normalizePayload(item.payload, item.payloadType);
261-
262-
const result = await triggerTaskService.call(
263-
item.task,
264-
{
265-
id: meta.environmentId,
266-
type: meta.environmentType,
267-
organizationId: meta.organizationId,
268-
projectId: meta.projectId,
269-
organization: { id: meta.organizationId },
270-
project: { id: meta.projectId },
271-
} as AuthenticatedEnvironment,
272-
{
273-
payload,
274-
options: {
275-
...(item.options as Record<string, unknown>),
276-
payloadType: item.payloadType,
277-
parentRunId: meta.parentRunId,
278-
resumeParentOnCompletion: meta.resumeParentOnCompletion,
279-
parentBatch: batchId,
280-
},
281-
},
282-
{
283-
triggerVersion: meta.triggerVersion,
284-
traceContext: meta.traceContext as Record<string, unknown> | undefined,
285-
spanParentAsLink: meta.spanParentAsLink,
286-
batchId,
287-
batchIndex: itemIndex,
288-
skipChecks: true, // Already validated at batch level
289-
realtimeStreamsVersion: meta.realtimeStreamsVersion,
290-
},
291-
"V2"
292-
);
293-
294-
if (result) {
295-
span.setAttribute("batch.result.run_id", result.run.friendlyId);
296-
span.end();
297-
return { success: true as const, runId: result.run.friendlyId };
298-
} else {
299-
span.setAttribute("batch.result.error", "TriggerTaskService returned undefined");
300-
span.end();
301-
return {
302-
success: false as const,
303-
error: "TriggerTaskService returned undefined",
304-
errorCode: "TRIGGER_FAILED",
305-
};
306-
}
307-
} catch (error) {
308-
span.setAttribute(
309-
"batch.result.error",
310-
error instanceof Error ? error.message : String(error)
311-
);
312-
span.recordException(error instanceof Error ? error : new Error(String(error)));
313-
span.end();
314-
return {
315-
success: false as const,
316-
error: error instanceof Error ? error.message : String(error),
317-
errorCode: "TRIGGER_ERROR",
318-
};
319-
}
320-
}
321-
);
322-
});
323-
324-
// Batch completion callback - updates Postgres with results
325-
engine.setBatchCompletionCallback(async (result: CompleteBatchResult) => {
326-
const { batchId, runIds, successfulRunCount, failedRunCount, failures } = result;
327-
328-
// Determine final status
329-
let status: BatchTaskRunStatus;
330-
if (failedRunCount > 0 && successfulRunCount === 0) {
331-
status = "ABORTED";
332-
} else if (failedRunCount > 0) {
333-
status = "PARTIAL_FAILED";
334-
} else {
335-
status = "PENDING"; // All runs created, waiting for completion
336-
}
337-
338-
try {
339-
// Update BatchTaskRun
340-
await prisma.batchTaskRun.update({
341-
where: { id: batchId },
342-
data: {
343-
status,
344-
runIds,
345-
successfulRunCount,
346-
failedRunCount,
347-
completedAt: status === "ABORTED" ? new Date() : undefined,
348-
processingCompletedAt: new Date(),
349-
},
350-
});
351-
352-
// Create error records if there were failures
353-
if (failures.length > 0) {
354-
for (const failure of failures) {
355-
await prisma.batchTaskRunError.create({
356-
data: {
357-
batchTaskRunId: batchId,
358-
index: failure.index,
359-
taskIdentifier: failure.taskIdentifier,
360-
payload: failure.payload,
361-
options: failure.options as Prisma.InputJsonValue | undefined,
362-
error: failure.error,
363-
errorCode: failure.errorCode,
364-
},
365-
});
366-
}
367-
}
368-
369-
// Try to complete the batch (handles waitpoint completion if all runs are done)
370-
if (status !== "ABORTED") {
371-
await engine.tryCompleteBatch({ batchId });
372-
}
373-
374-
logger.info("Batch completion handled", {
375-
batchId,
376-
status,
377-
successfulRunCount,
378-
failedRunCount,
379-
});
380-
} catch (error) {
381-
logger.error("Failed to handle batch completion", {
382-
batchId,
383-
error: error instanceof Error ? error.message : String(error),
384-
});
385-
}
386-
});
387-
388-
logger.info("BatchQueue callbacks configured");
389-
}

0 commit comments

Comments
 (0)