Skip to content

Commit d893b26

Browse files
authored
fix(engine): store costInCents and usageDurationMs on the TaskRun table via existing run engine updates (#2926)
Moving usage updates into the run engine to prevent inefficient & additional incremental updates to the TaskRun table. Read/Modify/Write pattern is safe inside of the run engine because of the run lock. We can also now cap the usageDurationMs value from overflowing and causing an error. ## Why? This is preventing at least one update per TaskRun and instead updating these values piggybacking on other updates. ## Aurora PostgreSQL Reader Consistency Notes ### TL;DR Aurora readers share the same storage as the writer, but maintain separate in-memory page caches. This means: - **Storage is always consistent** - writes are synchronously committed to shared storage - **Page cache can lag** - typically <100ms, but can cause stale reads if data is cached ### How It Works 1. Writer commits to shared storage (synchronous 4/6 quorum) 2. Writer sends cache invalidation messages to readers (asynchronous) 3. If reader has data in cache → returns cached (potentially stale) value 4. If reader has cache miss → fetches from shared storage (always current) ### Monitoring ```sql SELECT server_id, CASE WHEN session_id = 'MASTER_SESSION_ID' THEN 'Writer' ELSE 'Reader' END AS role, replica_lag_in_msec FROM aurora_replica_status(); ```
1 parent 6f26acb commit d893b26

File tree

3 files changed

+437
-51
lines changed

3 files changed

+437
-51
lines changed

internal-packages/run-engine/src/engine/retrying.ts

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ export type RetryOutcome =
3737
settings: TaskRunExecutionRetry;
3838
machine?: string;
3939
wasOOMError?: boolean;
40+
// Current usage values for calculating updated totals
41+
usageDurationMs: number;
42+
costInCents: number;
43+
machinePreset: string | null;
4044
};
4145

4246
export async function retryOutcomeFromCompletion(
@@ -70,6 +74,9 @@ export async function retryOutcomeFromCompletion(
7074
machine: oomResult.machine,
7175
settings: { timestamp: Date.now() + delay, delay },
7276
wasOOMError: true,
77+
usageDurationMs: oomResult.usageDurationMs,
78+
costInCents: oomResult.costInCents,
79+
machinePreset: oomResult.machinePreset,
7380
};
7481
}
7582

@@ -87,14 +94,17 @@ export async function retryOutcomeFromCompletion(
8794
return { outcome: "fail_run", sanitizedError };
8895
}
8996

90-
// Get the run settings
97+
// Get the run settings and current usage values
9198
const run = await prisma.taskRun.findFirst({
9299
where: {
93100
id: runId,
94101
},
95102
select: {
96103
maxAttempts: true,
97104
lockedRetryConfig: true,
105+
usageDurationMs: true,
106+
costInCents: true,
107+
machinePreset: true,
98108
},
99109
});
100110

@@ -151,20 +161,32 @@ export async function retryOutcomeFromCompletion(
151161
outcome: "retry",
152162
method: "queue", // we'll always retry on the queue because usually having no settings means something bad happened
153163
settings: retrySettings,
164+
usageDurationMs: run.usageDurationMs,
165+
costInCents: run.costInCents,
166+
machinePreset: run.machinePreset,
154167
};
155168
}
156169

157170
return {
158171
outcome: "retry",
159172
method: retryUsingQueue ? "queue" : "immediate",
160173
settings: retrySettings,
174+
usageDurationMs: run.usageDurationMs,
175+
costInCents: run.costInCents,
176+
machinePreset: run.machinePreset,
161177
};
162178
}
163179

164180
async function retryOOMOnMachine(
165181
prisma: PrismaClientOrTransaction,
166182
runId: string
167-
): Promise<{ machine: string; retrySettings: RetryOptions } | undefined> {
183+
): Promise<{
184+
machine: string;
185+
retrySettings: RetryOptions;
186+
usageDurationMs: number;
187+
costInCents: number;
188+
machinePreset: string | null;
189+
} | undefined> {
168190
try {
169191
const run = await prisma.taskRun.findFirst({
170192
where: {
@@ -173,6 +195,8 @@ async function retryOOMOnMachine(
173195
select: {
174196
machinePreset: true,
175197
lockedRetryConfig: true,
198+
usageDurationMs: true,
199+
costInCents: true,
176200
},
177201
});
178202

@@ -201,7 +225,13 @@ async function retryOOMOnMachine(
201225
return;
202226
}
203227

204-
return { machine: retryMachine, retrySettings: parsedRetryConfig.data };
228+
return {
229+
machine: retryMachine,
230+
retrySettings: parsedRetryConfig.data,
231+
usageDurationMs: run.usageDurationMs,
232+
costInCents: run.costInCents,
233+
machinePreset: run.machinePreset,
234+
};
205235
} catch (error) {
206236
console.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
207237
runId,

0 commit comments

Comments
 (0)