Skip to content

Commit f564845

Browse files
committed
v4: eagerly fork child process before warm start
Also fixes an issue where the attempt span events weren't coming through in the partial spans
1 parent 18ad897 commit f564845

File tree

15 files changed

+224
-67
lines changed

15 files changed

+224
-67
lines changed

packages/cli-v3/src/entryPoints/dev-run-controller.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,15 @@ export class DevRunController {
622622
version: this.opts.worker.serverWorker?.version,
623623
engine: "V2",
624624
},
625+
machine: execution.machine,
626+
}).initialize();
627+
628+
logger.debug("executing task run process", {
629+
attemptId: execution.attempt.id,
630+
runId: execution.run.id,
631+
});
632+
633+
const completion = await this.taskRunProcess.execute({
625634
payload: {
626635
execution,
627636
traceContext: execution.run.traceContext ?? {},
@@ -630,15 +639,6 @@ export class DevRunController {
630639
messageId: run.friendlyId,
631640
});
632641

633-
await this.taskRunProcess.initialize();
634-
635-
logger.debug("executing task run process", {
636-
attemptId: execution.attempt.id,
637-
runId: execution.run.id,
638-
});
639-
640-
const completion = await this.taskRunProcess.execute();
641-
642642
logger.debug("Completed run", completion);
643643

644644
try {

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
logLevels,
3737
ManagedRuntimeManager,
3838
OtelTaskLogger,
39+
populateEnv,
3940
StandardLifecycleHooksManager,
4041
StandardLocalsManager,
4142
StandardMetadataManager,
@@ -238,7 +239,13 @@ const zodIpc = new ZodIpcConnection({
238239
emitSchema: ExecutorToWorkerMessageCatalog,
239240
process,
240241
handlers: {
241-
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
242+
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics, env }, sender) => {
243+
if (env) {
244+
populateEnv(env, {
245+
override: true,
246+
});
247+
}
248+
242249
log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);
243250

244251
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);

packages/cli-v3/src/entryPoints/managed-run-controller.ts

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,18 @@ class ManagedRunController {
836836
this.exitProcess(this.successExitCode);
837837
}
838838

839+
if (this.taskRunProcess) {
840+
logger.debug("waitForNextRun: eagerly recreating task run process with options");
841+
this.taskRunProcess = new TaskRunProcess({
842+
...this.taskRunProcess.options,
843+
isWarmStart: true,
844+
}).initialize();
845+
} else {
846+
logger.debug(
847+
"waitForNextRun: no existing task run process, so we can't eagerly recreate it"
848+
);
849+
}
850+
839851
// Check the service is up and get additional warm start config
840852
const connect = await this.warmStartClient.connect();
841853

@@ -904,6 +916,9 @@ class ManagedRunController {
904916

905917
private exitProcess(code?: number): never {
906918
logger.log("Exiting process", { code });
919+
if (this.taskRunProcess?.isPreparedForNextRun) {
920+
this.taskRunProcess.forceExit();
921+
}
907922
process.exit(code);
908923
}
909924

@@ -980,30 +995,33 @@ class ManagedRunController {
980995
}: WorkloadRunAttemptStartResponseBody) {
981996
this.snapshotPoller.start();
982997

983-
this.taskRunProcess = new TaskRunProcess({
984-
workerManifest: this.workerManifest,
985-
env: envVars,
986-
serverWorker: {
987-
id: "unmanaged",
988-
contentHash: env.TRIGGER_CONTENT_HASH,
989-
version: env.TRIGGER_DEPLOYMENT_VERSION,
990-
engine: "V2",
991-
},
992-
payload: {
993-
execution,
994-
traceContext: execution.run.traceContext ?? {},
995-
},
996-
messageId: run.friendlyId,
997-
});
998-
999-
await this.taskRunProcess.initialize();
998+
if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
999+
this.taskRunProcess = new TaskRunProcess({
1000+
workerManifest: this.workerManifest,
1001+
env: envVars,
1002+
serverWorker: {
1003+
id: "unmanaged",
1004+
contentHash: env.TRIGGER_CONTENT_HASH,
1005+
version: env.TRIGGER_DEPLOYMENT_VERSION,
1006+
engine: "V2",
1007+
},
1008+
machine: execution.machine,
1009+
}).initialize();
1010+
}
10001011

10011012
logger.log("executing task run process", {
10021013
attemptId: execution.attempt.id,
10031014
runId: execution.run.id,
10041015
});
10051016

1006-
const completion = await this.taskRunProcess.execute();
1017+
const completion = await this.taskRunProcess.execute({
1018+
payload: {
1019+
execution,
1020+
traceContext: execution.run.traceContext ?? {},
1021+
},
1022+
messageId: run.friendlyId,
1023+
env: envVars,
1024+
});
10071025

10081026
logger.log("Completed run", completion);
10091027

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import {
3636
logLevels,
3737
ManagedRuntimeManager,
3838
OtelTaskLogger,
39+
populateEnv,
3940
ProdUsageManager,
4041
StandardLifecycleHooksManager,
4142
StandardLocalsManager,
@@ -248,7 +249,16 @@ const zodIpc = new ZodIpcConnection({
248249
emitSchema: ExecutorToWorkerMessageCatalog,
249250
process,
250251
handlers: {
251-
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
252+
EXECUTE_TASK_RUN: async (
253+
{ execution, traceContext, metadata, metrics, env, isWarmStart },
254+
sender
255+
) => {
256+
if (env) {
257+
populateEnv(env, {
258+
override: true,
259+
});
260+
}
261+
252262
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);
253263

254264
console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);
@@ -383,6 +393,7 @@ const zodIpc = new ZodIpcConnection({
383393
tracingSDK,
384394
consoleInterceptor,
385395
retries: config.retries,
396+
isWarmStart,
386397
});
387398

388399
try {

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
CompletedWaitpoint,
33
ExecutorToWorkerMessageCatalog,
4+
MachinePreset,
45
ServerBackgroundWorker,
56
TaskRunErrorCodes,
67
TaskRunExecution,
@@ -48,10 +49,15 @@ export type TaskRunProcessOptions = {
4849
workerManifest: WorkerManifest;
4950
serverWorker: ServerBackgroundWorker;
5051
env: Record<string, string>;
52+
machine: MachinePreset;
53+
isWarmStart?: boolean;
54+
cwd?: string;
55+
};
56+
57+
export type TaskRunProcessExecuteParams = {
5158
payload: TaskRunExecutionPayload;
5259
messageId: string;
53-
54-
cwd?: string;
60+
env?: Record<string, string>;
5561
};
5662

5763
export class TaskRunProcess {
@@ -79,9 +85,18 @@ export class TaskRunProcess {
7985
public onWaitForBatch: Evt<OnWaitForBatchMessage> = new Evt();
8086
public onWait: Evt<OnWaitMessage> = new Evt();
8187

82-
constructor(public readonly options: TaskRunProcessOptions) {}
88+
private _isPreparedForNextRun: boolean = false;
89+
90+
constructor(public readonly options: TaskRunProcessOptions) {
91+
this._isPreparedForNextRun = true;
92+
}
93+
94+
get isPreparedForNextRun() {
95+
return this._isPreparedForNextRun;
96+
}
8397

8498
async cancel() {
99+
this._isPreparedForNextRun = false;
85100
this._isBeingCancelled = true;
86101

87102
try {
@@ -94,6 +109,8 @@ export class TaskRunProcess {
94109
}
95110

96111
async cleanup(kill = true) {
112+
this._isPreparedForNextRun = false;
113+
97114
try {
98115
await this.#flush();
99116
} catch (err) {
@@ -105,25 +122,12 @@ export class TaskRunProcess {
105122
}
106123
}
107124

108-
get runId() {
109-
return this.options.payload.execution.run.id;
110-
}
125+
initialize() {
126+
const { env: $env, workerManifest, cwd, machine } = this.options;
111127

112-
get isTest() {
113-
return this.options.payload.execution.run.isTest;
114-
}
115-
116-
get payload(): TaskRunExecutionPayload {
117-
return this.options.payload;
118-
}
119-
120-
async initialize() {
121-
const { env: $env, workerManifest, cwd, messageId, payload } = this.options;
122-
123-
const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, payload.execution.machine);
128+
const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, machine);
124129

125130
const fullEnv = {
126-
...(this.isTest ? { TRIGGER_LOG_LEVEL: "debug" } : {}),
127131
...$env,
128132
OTEL_IMPORT_HOOK_INCLUDES: workerManifest.otelImportHook?.include?.join(","),
129133
// TODO: this will probably need to use something different for bun (maybe --preload?)
@@ -132,7 +136,7 @@ export class TaskRunProcess {
132136
TRIGGER_PROCESS_FORK_START_TIME: String(Date.now()),
133137
};
134138

135-
logger.debug(`[${this.runId}] initializing task run process`, {
139+
logger.debug(`initializing task run process`, {
136140
env: fullEnv,
137141
path: workerManifest.workerEntryPoint,
138142
cwd,
@@ -175,13 +179,13 @@ export class TaskRunProcess {
175179

176180
resolver(result);
177181
},
178-
READY_TO_DISPOSE: async (message) => {
179-
logger.debug(`[${this.runId}] task run process is ready to dispose`);
182+
READY_TO_DISPOSE: async () => {
183+
logger.debug(`task run process is ready to dispose`);
180184

181185
this.onReadyToDispose.post(this);
182186
},
183187
TASK_HEARTBEAT: async (message) => {
184-
this.onTaskRunHeartbeat.post(messageId);
188+
this.onTaskRunHeartbeat.post(message.id);
185189
},
186190
WAIT_FOR_TASK: async (message) => {
187191
this.onWaitForTask.post(message);
@@ -190,14 +194,16 @@ export class TaskRunProcess {
190194
this.onWaitForBatch.post(message);
191195
},
192196
UNCAUGHT_EXCEPTION: async (message) => {
193-
logger.debug(`[${this.runId}] uncaught exception in task run process`, { ...message });
197+
logger.debug("uncaught exception in task run process", { ...message });
194198
},
195199
},
196200
});
197201

198202
this._child.on("exit", this.#handleExit.bind(this));
199203
this._child.stdout?.on("data", this.#handleLog.bind(this));
200204
this._child.stderr?.on("data", this.#handleStdErr.bind(this));
205+
206+
return this;
201207
}
202208

203209
async #flush(timeoutInMs: number = 5_000) {
@@ -206,7 +212,9 @@ export class TaskRunProcess {
206212
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
207213
}
208214

209-
async execute(): Promise<TaskRunExecutionResult> {
215+
async execute(params: TaskRunProcessExecuteParams): Promise<TaskRunExecutionResult> {
216+
this._isPreparedForNextRun = false;
217+
210218
let resolver: (value: TaskRunExecutionResult) => void;
211219
let rejecter: (err?: any) => void;
212220

@@ -215,19 +223,19 @@ export class TaskRunProcess {
215223
rejecter = reject;
216224
});
217225

218-
this._attemptStatuses.set(this.payload.execution.attempt.id, "PENDING");
226+
this._attemptStatuses.set(params.payload.execution.attempt.id, "PENDING");
219227

220228
// @ts-expect-error - We know that the resolver and rejecter are defined
221-
this._attemptPromises.set(this.payload.execution.attempt.id, { resolver, rejecter });
229+
this._attemptPromises.set(params.payload.execution.attempt.id, { resolver, rejecter });
222230

223-
const { execution, traceContext, metrics } = this.payload;
231+
const { execution, traceContext, metrics } = params.payload;
224232

225233
this._currentExecution = execution;
226234

227235
if (this._child?.connected && !this._isBeingKilled && !this._child.killed) {
228236
logger.debug(
229237
`[${new Date().toISOString()}][${
230-
this.runId
238+
params.payload.execution.run.id
231239
}] sending EXECUTE_TASK_RUN message to task run process`,
232240
{
233241
pid: this.pid,
@@ -239,6 +247,8 @@ export class TaskRunProcess {
239247
traceContext,
240248
metadata: this.options.serverWorker,
241249
metrics,
250+
env: params.env,
251+
isWarmStart: this.options.isWarmStart,
242252
});
243253
}
244254

@@ -398,7 +408,7 @@ export class TaskRunProcess {
398408
}
399409

400410
async kill(signal?: number | NodeJS.Signals, timeoutInMs?: number) {
401-
logger.debug(`[${this.runId}] killing task run process`, {
411+
logger.debug(`killing task run process`, {
402412
signal,
403413
timeoutInMs,
404414
pid: this.pid,
@@ -417,6 +427,16 @@ export class TaskRunProcess {
417427
}
418428
}
419429

430+
forceExit() {
431+
try {
432+
this._isBeingKilled = true;
433+
434+
this._child?.kill("SIGKILL");
435+
} catch (error) {
436+
logger.debug("forceExit: failed to kill child process", { error });
437+
}
438+
}
439+
420440
get isBeingKilled() {
421441
return this._isBeingKilled || this._child?.killed;
422442
}

packages/core/src/v3/schemas/messages.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ export const WorkerToExecutorMessageCatalog = {
215215
traceContext: z.record(z.unknown()),
216216
metadata: ServerBackgroundWorker,
217217
metrics: TaskRunExecutionMetrics.optional(),
218+
env: z.record(z.string()).optional(),
219+
isWarmStart: z.boolean().optional(),
218220
}),
219221
},
220222
TASK_RUN_COMPLETED_NOTIFICATION: {

packages/core/src/v3/semanticInternalAttributes.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export const SemanticInternalAttributes = {
2323
MACHINE_PRESET_CPU: "ctx.machine.cpu",
2424
MACHINE_PRESET_MEMORY: "ctx.machine.memory",
2525
MACHINE_PRESET_CENTS_PER_MS: "ctx.machine.centsPerMs",
26+
SKIP_SPAN_PARTIAL: "$span.skip_partial",
2627
SPAN_PARTIAL: "$span.partial",
2728
SPAN_ID: "$span.span_id",
2829
ENTITY_TYPE: "$entity.type",
@@ -58,4 +59,5 @@ export const SemanticInternalAttributes = {
5859
SPAN_ATTEMPT: "$span.attempt",
5960
METRIC_EVENTS: "$metrics.events",
6061
EXECUTION_ENVIRONMENT: "exec_env",
62+
WARM_START: "warm_start",
6163
};

0 commit comments

Comments
 (0)