Skip to content

Commit f232367

Browse files
authored
Merge branch 'main' into feat/tri-6733-reset-an-idempotencykey
2 parents 4463cf5 + a999d9e commit f232367

File tree

92 files changed

+20803
-796
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+20803
-796
lines changed

.changeset/fluffy-crews-rhyme.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
feat: Support for new batch trigger system

.env.example

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,10 @@ POSTHOG_PROJECT_KEY=
8585
# These control the server-side internal telemetry
8686
# INTERNAL_OTEL_TRACE_EXPORTER_URL=<URL to send traces to>
8787
# INTERNAL_OTEL_TRACE_LOGGING_ENABLED=1
88-
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0,
88+
# INTERNAL_OTEL_TRACE_INSTRUMENT_PRISMA_ENABLED=0
89+
90+
# Enable local observability stack (requires `pnpm run docker` to start otel-collector)
91+
# Uncomment these to send metrics to the local Prometheus via OTEL Collector:
92+
# INTERNAL_OTEL_METRIC_EXPORTER_ENABLED=1
93+
# INTERNAL_OTEL_METRIC_EXPORTER_URL=http://localhost:4318/v1/metrics
94+
# INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS=15000

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,5 @@ apps/**/public/build
6262
/packages/trigger-sdk/src/package.json
6363
/packages/python/src/package.json
6464
.claude
65-
.mcp.log
65+
.mcp.log
66+
.cursor/debug.log

apps/webapp/app/components/runs/v3/BatchStatus.tsx

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
1-
import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/20/solid";
1+
import {
2+
CheckCircleIcon,
3+
ExclamationTriangleIcon,
4+
XCircleIcon,
5+
} from "@heroicons/react/20/solid";
26
import type { BatchTaskRunStatus } from "@trigger.dev/database";
37
import assertNever from "assert-never";
48
import { Spinner } from "~/components/primitives/Spinner";
59
import { cn } from "~/utils/cn";
610

7-
export const allBatchStatuses = ["PENDING", "COMPLETED", "ABORTED"] as const satisfies Readonly<
8-
Array<BatchTaskRunStatus>
9-
>;
11+
export const allBatchStatuses = [
12+
"PROCESSING",
13+
"PENDING",
14+
"COMPLETED",
15+
"PARTIAL_FAILED",
16+
"ABORTED",
17+
] as const satisfies Readonly<Array<BatchTaskRunStatus>>;
1018

1119
const descriptions: Record<BatchTaskRunStatus, string> = {
20+
PROCESSING: "The batch is being processed and runs are being created.",
1221
PENDING: "The batch has child runs that have not yet completed.",
1322
COMPLETED: "All the batch child runs have finished.",
14-
ABORTED: "The batch was aborted because some child tasks could not be triggered.",
23+
PARTIAL_FAILED: "Some runs failed to be created. Successfully created runs are still executing.",
24+
ABORTED: "The batch was aborted because child tasks could not be triggered.",
1525
};
1626

1727
export function descriptionForBatchStatus(status: BatchTaskRunStatus): string {
@@ -47,10 +57,14 @@ export function BatchStatusIcon({
4757
className: string;
4858
}) {
4959
switch (status) {
60+
case "PROCESSING":
61+
return <Spinner className={cn(batchStatusColor(status), className)} />;
5062
case "PENDING":
5163
return <Spinner className={cn(batchStatusColor(status), className)} />;
5264
case "COMPLETED":
5365
return <CheckCircleIcon className={cn(batchStatusColor(status), className)} />;
66+
case "PARTIAL_FAILED":
67+
return <ExclamationTriangleIcon className={cn(batchStatusColor(status), className)} />;
5468
case "ABORTED":
5569
return <XCircleIcon className={cn(batchStatusColor(status), className)} />;
5670
default: {
@@ -61,10 +75,14 @@ export function BatchStatusIcon({
6175

6276
export function batchStatusColor(status: BatchTaskRunStatus): string {
6377
switch (status) {
78+
case "PROCESSING":
79+
return "text-blue-500";
6480
case "PENDING":
6581
return "text-pending";
6682
case "COMPLETED":
6783
return "text-success";
84+
case "PARTIAL_FAILED":
85+
return "text-warning";
6886
case "ABORTED":
6987
return "text-error";
7088
default: {
@@ -75,10 +93,14 @@ export function batchStatusColor(status: BatchTaskRunStatus): string {
7593

7694
export function batchStatusTitle(status: BatchTaskRunStatus): string {
7795
switch (status) {
96+
case "PROCESSING":
97+
return "Processing";
7898
case "PENDING":
7999
return "In progress";
80100
case "COMPLETED":
81101
return "Completed";
102+
case "PARTIAL_FAILED":
103+
return "Partial failure";
82104
case "ABORTED":
83105
return "Aborted";
84106
default: {

apps/webapp/app/entry.server.tsx

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
1-
import {
2-
createReadableStreamFromReadable,
3-
type DataFunctionArgs,
4-
type EntryContext,
5-
} from "@remix-run/node"; // or cloudflare/deno
1+
import { createReadableStreamFromReadable, type EntryContext } from "@remix-run/node"; // or cloudflare/deno
62
import { RemixServer } from "@remix-run/react";
3+
import { wrapHandleErrorWithSentry } from "@sentry/remix";
74
import { parseAcceptLanguage } from "intl-parse-accept-language";
85
import isbot from "isbot";
96
import { renderToPipeableStream } from "react-dom/server";
107
import { PassThrough } from "stream";
118
import * as Worker from "~/services/worker.server";
9+
import { bootstrap } from "./bootstrap";
1210
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1311
import {
1412
OperatingSystemContextProvider,
1513
OperatingSystemPlatform,
1614
} from "./components/primitives/OperatingSystemProvider";
15+
import { Prisma } from "./db.server";
16+
import { env } from "./env.server";
17+
import { eventLoopMonitor } from "./eventLoopMonitor.server";
18+
import { logger } from "./services/logger.server";
19+
import { resourceMonitor } from "./services/resourceMonitor.server";
1720
import { singleton } from "./utils/singleton";
18-
import { bootstrap } from "./bootstrap";
19-
import { wrapHandleErrorWithSentry } from "@sentry/remix";
21+
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
22+
import {
23+
registerRunEngineEventBusHandlers,
24+
setupBatchQueueCallbacks,
25+
} from "./v3/runEngineHandlers.server";
2026

2127
const ABORT_DELAY = 30000;
2228

@@ -228,19 +234,13 @@ process.on("uncaughtException", (error, origin) => {
228234
});
229235

230236
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
237+
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
231238

232239
export { apiRateLimiter } from "./services/apiRateLimit.server";
233240
export { engineRateLimiter } from "./services/engineRateLimit.server";
241+
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
234242
export { socketIo } from "./v3/handleSocketIo.server";
235243
export { wss } from "./v3/handleWebsockets.server";
236-
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
237-
import { eventLoopMonitor } from "./eventLoopMonitor.server";
238-
import { env } from "./env.server";
239-
import { logger } from "./services/logger.server";
240-
import { Prisma } from "./db.server";
241-
import { registerRunEngineEventBusHandlers } from "./v3/runEngineHandlers.server";
242-
import { remoteBuildsEnabled } from "./v3/remoteImageBuilder.server";
243-
import { resourceMonitor } from "./services/resourceMonitor.server";
244244

245245
if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
246246
eventLoopMonitor.enable();

apps/webapp/app/env.server.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,7 @@ const EnvironmentSchema = z
528528
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
529529
MAXIMUM_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(10_000),
530530
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
531+
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().optional(), // Defaults to TASK_PAYLOAD_OFFLOAD_THRESHOLD if not set
531532
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
532533
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
533534
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
@@ -537,6 +538,14 @@ const EnvironmentSchema = z
537538
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
538539
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
539540

541+
// 2-phase batch API settings
542+
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
543+
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
544+
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
545+
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
546+
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
547+
BATCH_CONCURRENCY_LIMIT_DEFAULT: z.coerce.number().int().default(10),
548+
540549
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
541550
REALTIME_STREAM_MAX_LENGTH: z.coerce.number().int().default(1000),
542551
REALTIME_STREAM_TTL: z.coerce
@@ -931,6 +940,17 @@ const EnvironmentSchema = z
931940
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
932941
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
933942

943+
// BatchQueue DRR settings (Run Engine v2)
944+
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(25),
945+
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
946+
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
947+
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
948+
// Global rate limit: max items processed per second across all consumers
949+
// If not set, no global rate limiting is applied
950+
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
951+
// Processing concurrency: max concurrent batch items being processed per environment
952+
BATCH_CONCURRENCY_DEFAULT_CONCURRENCY: z.coerce.number().int().default(1),
953+
934954
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
935955
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
936956
ADMIN_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),

apps/webapp/app/presenters/v3/BatchListPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ WHERE
195195
throw new Error(`Environment not found for Batch ${batch.id}`);
196196
}
197197

198-
const hasFinished = batch.status !== "PENDING";
198+
const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";
199199

200200
return {
201201
id: batch.id,
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { type BatchTaskRunStatus } from "@trigger.dev/database";
2+
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
3+
import { engine } from "~/v3/runEngine.server";
4+
import { BasePresenter } from "./basePresenter.server";
5+
6+
type BatchPresenterOptions = {
7+
environmentId: string;
8+
batchId: string;
9+
userId?: string;
10+
};
11+
12+
export type BatchPresenterData = Awaited<ReturnType<BatchPresenter["call"]>>;
13+
14+
export class BatchPresenter extends BasePresenter {
15+
public async call({ environmentId, batchId, userId }: BatchPresenterOptions) {
16+
const batch = await this._replica.batchTaskRun.findFirst({
17+
select: {
18+
id: true,
19+
friendlyId: true,
20+
status: true,
21+
runCount: true,
22+
batchVersion: true,
23+
createdAt: true,
24+
updatedAt: true,
25+
completedAt: true,
26+
processingStartedAt: true,
27+
processingCompletedAt: true,
28+
successfulRunCount: true,
29+
failedRunCount: true,
30+
idempotencyKey: true,
31+
runtimeEnvironment: {
32+
select: {
33+
id: true,
34+
type: true,
35+
slug: true,
36+
orgMember: {
37+
select: {
38+
user: {
39+
select: {
40+
id: true,
41+
name: true,
42+
displayName: true,
43+
},
44+
},
45+
},
46+
},
47+
},
48+
},
49+
errors: {
50+
select: {
51+
id: true,
52+
index: true,
53+
taskIdentifier: true,
54+
error: true,
55+
errorCode: true,
56+
createdAt: true,
57+
},
58+
orderBy: {
59+
index: "asc",
60+
},
61+
},
62+
},
63+
where: {
64+
runtimeEnvironmentId: environmentId,
65+
friendlyId: batchId,
66+
},
67+
});
68+
69+
if (!batch) {
70+
throw new Error("Batch not found");
71+
}
72+
73+
const hasFinished = batch.status !== "PENDING" && batch.status !== "PROCESSING";
74+
const isV2 = batch.batchVersion === "runengine:v2";
75+
76+
// For v2 batches in PROCESSING state, get live progress from Redis
77+
// This provides real-time updates without waiting for the batch to complete
78+
let liveSuccessCount = batch.successfulRunCount ?? 0;
79+
let liveFailureCount = batch.failedRunCount ?? 0;
80+
81+
if (isV2 && batch.status === "PROCESSING") {
82+
const liveProgress = await engine.getBatchQueueProgress(batch.id);
83+
if (liveProgress) {
84+
liveSuccessCount = liveProgress.successCount;
85+
liveFailureCount = liveProgress.failureCount;
86+
}
87+
}
88+
89+
return {
90+
id: batch.id,
91+
friendlyId: batch.friendlyId,
92+
status: batch.status as BatchTaskRunStatus,
93+
runCount: batch.runCount,
94+
batchVersion: batch.batchVersion,
95+
isV2,
96+
createdAt: batch.createdAt.toISOString(),
97+
updatedAt: batch.updatedAt.toISOString(),
98+
completedAt: batch.completedAt?.toISOString(),
99+
processingStartedAt: batch.processingStartedAt?.toISOString(),
100+
processingCompletedAt: batch.processingCompletedAt?.toISOString(),
101+
finishedAt: batch.completedAt
102+
? batch.completedAt.toISOString()
103+
: hasFinished
104+
? batch.updatedAt.toISOString()
105+
: undefined,
106+
hasFinished,
107+
successfulRunCount: liveSuccessCount,
108+
failedRunCount: liveFailureCount,
109+
idempotencyKey: batch.idempotencyKey,
110+
environment: displayableEnvironment(batch.runtimeEnvironment, userId),
111+
errors: batch.errors.map((error) => ({
112+
id: error.id,
113+
index: error.index,
114+
taskIdentifier: error.taskIdentifier,
115+
error: error.error,
116+
errorCode: error.errorCode,
117+
createdAt: error.createdAt.toISOString(),
118+
})),
119+
};
120+
}
121+
}
122+

0 commit comments

Comments
 (0)