Skip to content

Commit 1643743

Browse files
committed
execution fixes and logging improvements
1 parent fdf14e0 commit 1643743

File tree

3 files changed

+526
-661
lines changed

3 files changed

+526
-661
lines changed

apps/supervisor/src/workloadServer/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
452452
logger.debug("runConnected", { ...getSocketMetadata() });
453453

454454
// If there's already a run ID set, we should "disconnect" it from this socket
455-
if (socket.data.runFriendlyId) {
455+
if (socket.data.runFriendlyId && socket.data.runFriendlyId !== friendlyId) {
456456
logger.debug("runConnected: disconnecting existing run", {
457457
...getSocketMetadata(),
458458
newRunId: friendlyId,

packages/cli-v3/src/entryPoints/managed/controller.ts

Lines changed: 86 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -148,60 +148,49 @@ export class ManagedRunController {
148148

149149
private lockedRunExecution: Promise<void> | null = null;
150150

151-
private async startAndExecuteRunAttempt({
151+
private async startRunExecution({
152152
runFriendlyId,
153153
snapshotFriendlyId,
154154
dequeuedAt,
155155
podScheduledAt,
156156
isWarmStart,
157+
previousRunId,
157158
}: {
158159
runFriendlyId: string;
159160
snapshotFriendlyId: string;
160161
dequeuedAt?: Date;
161162
podScheduledAt?: Date;
162163
isWarmStart?: boolean;
164+
previousRunId?: string;
163165
}) {
166+
this.sendDebugLog({
167+
runId: runFriendlyId,
168+
message: "startAndExecuteRunAttempt()",
169+
properties: { previousRunId },
170+
});
171+
164172
if (this.lockedRunExecution) {
165173
this.sendDebugLog({
166174
runId: runFriendlyId,
167-
message: "startAndExecuteRunAttempt: already in progress",
175+
message: "startAndExecuteRunAttempt: execution already locked",
168176
});
169177
return;
170178
}
171179

172-
this.sendDebugLog({
173-
runId: runFriendlyId,
174-
message: "startAndExecuteRunAttempt: called",
175-
});
176-
177180
const execution = async () => {
178-
if (!this.socket) {
179-
this.sendDebugLog({
180-
runId: runFriendlyId,
181-
message: "Starting run without socket connection",
182-
});
183-
}
184-
185-
// Create a new RunExecution instance for this attempt
186-
const newExecution = new RunExecution({
187-
workerManifest: this.workerManifest,
188-
env: this.env,
189-
httpClient: this.httpClient,
190-
logger: this.logger,
191-
});
192-
193-
// If we have a current execution with task run env, prepare the new execution
194-
if (this.currentExecution?.taskRunEnv) {
195-
newExecution.prepareForExecution({
196-
taskRunEnv: this.currentExecution.taskRunEnv,
181+
if (!this.currentExecution || !this.currentExecution.isPreparedForNextRun) {
182+
this.currentExecution = new RunExecution({
183+
workerManifest: this.workerManifest,
184+
env: this.env,
185+
httpClient: this.httpClient,
186+
logger: this.logger,
197187
});
198188
}
199189

200-
this.currentExecution = newExecution;
201-
202190
// Subscribe to run notifications
203191
this.subscribeToRunNotifications(runFriendlyId, snapshotFriendlyId);
204192

193+
// We're prepared for the next run so we can start executing
205194
await this.currentExecution.execute({
206195
runFriendlyId,
207196
snapshotFriendlyId,
@@ -223,50 +212,74 @@ export class ManagedRunController {
223212
});
224213
}
225214

215+
const metrics = this.currentExecution?.metrics;
216+
217+
if (metrics?.restoreCount) {
218+
this.restoreCount += metrics.restoreCount;
219+
}
220+
226221
this.lockedRunExecution = null;
227222
this.unsubscribeFromRunNotifications(runFriendlyId, snapshotFriendlyId);
228223
this.waitForNextRun();
229224
}
230225

231226
private waitForNextRunLock = false;
232227

233-
/** This will kill the child process before spinning up a new one. It will never throw,
234-
* but may exit the process on any errors or when no runs are available after the
235-
* configured duration. */
228+
/**
229+
* This will eagerly create a new run execution. It will never throw, but may exit
230+
* the process on any errors or when no runs are available after the configured duration.
231+
*/
236232
private async waitForNextRun() {
233+
this.sendDebugLog({
234+
runId: this.runFriendlyId,
235+
message: "waitForNextRun()",
236+
});
237+
237238
if (this.waitForNextRunLock) {
238239
this.sendDebugLog({
239240
runId: this.runFriendlyId,
240-
message: "waitForNextRun: already in progress",
241+
message: "waitForNextRun: already in progress, skipping",
242+
});
243+
return;
244+
}
245+
246+
if (this.lockedRunExecution) {
247+
this.sendDebugLog({
248+
runId: this.runFriendlyId,
249+
message: "waitForNextRun: execution locked, skipping",
241250
});
242251
return;
243252
}
244253

245254
this.waitForNextRunLock = true;
246-
const previousRunId = this.runFriendlyId;
247255

248256
try {
249-
// If there's a run execution in progress, we need to wait for it to finish
250-
if (this.lockedRunExecution) {
257+
if (!this.warmStartClient) {
251258
this.sendDebugLog({
252259
runId: this.runFriendlyId,
253-
message: "waitForNextRun: waiting for existing run execution to finish",
260+
message: "waitForNextRun: warm starts disabled, shutting down",
254261
});
255-
// TODO: maybe kill the process?
256-
await this.lockedRunExecution;
262+
this.exitProcess(this.successExitCode);
257263
}
258264

259-
this.sendDebugLog({
260-
runId: this.runFriendlyId,
261-
message: "waitForNextRun: waiting for next run",
262-
});
265+
const previousRunId = this.runFriendlyId;
263266

264-
if (!this.warmStartClient) {
267+
if (this.currentExecution?.taskRunEnv) {
265268
this.sendDebugLog({
266269
runId: this.runFriendlyId,
267-
message: "waitForNextRun: warm starts disabled, shutting down",
270+
message: "waitForNextRun: eagerly recreating task run process",
271+
});
272+
273+
const previousTaskRunEnv = this.currentExecution.taskRunEnv;
274+
275+
this.currentExecution = new RunExecution({
276+
workerManifest: this.workerManifest,
277+
env: this.env,
278+
httpClient: this.httpClient,
279+
logger: this.logger,
280+
}).prepareForExecution({
281+
taskRunEnv: previousTaskRunEnv,
268282
});
269-
this.exitProcess(this.successExitCode);
270283
}
271284

272285
// Check the service is up and get additional warm start config
@@ -288,34 +301,22 @@ export class ManagedRunController {
288301
connect.data.connectionTimeoutMs ?? this.env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS;
289302
const keepaliveMs = connect.data.keepaliveMs ?? this.env.TRIGGER_WARM_START_KEEPALIVE_MS;
290303

304+
const warmStartConfig = {
305+
connectionTimeoutMs,
306+
keepaliveMs,
307+
};
308+
291309
this.sendDebugLog({
292310
runId: this.runFriendlyId,
293311
message: "waitForNextRun: connected to warm start service",
294-
properties: {
295-
connectionTimeoutMs,
296-
keepaliveMs,
297-
},
312+
properties: warmStartConfig,
298313
});
299314

300-
if (previousRunId) {
301-
this.sendDebugLog({
302-
runId: previousRunId,
303-
message: "warm start: received config",
304-
properties: {
305-
connectionTimeoutMs,
306-
keepaliveMs,
307-
},
308-
});
309-
}
310-
311315
if (!connectionTimeoutMs || !keepaliveMs) {
312316
this.sendDebugLog({
313317
runId: this.runFriendlyId,
314318
message: "waitForNextRun: warm starts disabled after connect",
315-
properties: {
316-
connectionTimeoutMs,
317-
keepaliveMs,
318-
},
319+
properties: warmStartConfig,
319320
});
320321
this.exitProcess(this.successExitCode);
321322
}
@@ -330,6 +331,7 @@ export class ManagedRunController {
330331
this.sendDebugLog({
331332
runId: this.runFriendlyId,
332333
message: "waitForNextRun: warm start failed, shutting down",
334+
properties: warmStartConfig,
333335
});
334336
this.exitProcess(this.successExitCode);
335337
}
@@ -339,14 +341,18 @@ export class ManagedRunController {
339341
this.sendDebugLog({
340342
runId: this.runFriendlyId,
341343
message: "waitForNextRun: got next run",
342-
properties: { nextRun: nextRun.run.friendlyId },
344+
properties: {
345+
...warmStartConfig,
346+
nextRunId: nextRun.run.friendlyId,
347+
},
343348
});
344349

345-
this.startAndExecuteRunAttempt({
350+
this.startRunExecution({
346351
runFriendlyId: nextRun.run.friendlyId,
347352
snapshotFriendlyId: nextRun.snapshot.friendlyId,
348353
dequeuedAt: nextRun.dequeuedAt,
349354
isWarmStart: true,
355+
previousRunId,
350356
}).finally(() => {});
351357
} catch (error) {
352358
this.sendDebugLog({
@@ -454,27 +460,35 @@ export class ManagedRunController {
454460
socket.on("connect", () => {
455461
this.sendDebugLog({
456462
runId: this.runFriendlyId,
457-
message: "Connected to supervisor",
463+
message: "Socket connected to supervisor",
458464
});
459465

460466
// This should handle the case where we reconnect after being restored
461-
if (this.runFriendlyId && this.snapshotFriendlyId) {
467+
if (
468+
this.runFriendlyId &&
469+
this.snapshotFriendlyId &&
470+
this.runFriendlyId !== this.env.TRIGGER_RUN_ID
471+
) {
472+
this.sendDebugLog({
473+
runId: this.runFriendlyId,
474+
message: "Subscribing to notifications for in-progress run",
475+
});
462476
this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId);
463477
}
464478
});
465479

466480
socket.on("connect_error", (error) => {
467481
this.sendDebugLog({
468482
runId: this.runFriendlyId,
469-
message: "Connection error",
483+
message: "Socket connection error",
470484
properties: { error: error instanceof Error ? error.message : String(error) },
471485
});
472486
});
473487

474488
socket.on("disconnect", (reason, description) => {
475489
this.sendDebugLog({
476490
runId: this.runFriendlyId,
477-
message: "Disconnected from supervisor",
491+
message: "Socket disconnected from supervisor",
478492
properties: { reason, description: description?.toString() },
479493
});
480494
});
@@ -500,7 +514,7 @@ export class ManagedRunController {
500514

501515
// If we have run and snapshot IDs, we can start an attempt immediately
502516
if (this.env.TRIGGER_RUN_ID && this.env.TRIGGER_SNAPSHOT_ID) {
503-
this.startAndExecuteRunAttempt({
517+
this.startRunExecution({
504518
runFriendlyId: this.env.TRIGGER_RUN_ID,
505519
snapshotFriendlyId: this.env.TRIGGER_SNAPSHOT_ID,
506520
dequeuedAt: this.env.TRIGGER_DEQUEUED_AT_MS,
@@ -527,6 +541,7 @@ export class ManagedRunController {
527541
sendDebugLog(opts: SendDebugLogOptions) {
528542
this.logger.sendDebugLog({
529543
...opts,
544+
message: `[controller] ${opts.message}`,
530545
properties: {
531546
...opts.properties,
532547
runnerWarmStartCount: this.warmStartCount,

0 commit comments

Comments
 (0)