Skip to content

Commit 7d65962

Browse files
committed
address comments
1 parent 8b40252 commit 7d65962

9 files changed

Lines changed: 355 additions & 116 deletions

File tree

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const {
2020
mockExecuteJobInline,
2121
mockFeatureFlags,
2222
mockEnqueue,
23+
mockGetJob,
2324
mockStartJob,
2425
mockCompleteJob,
2526
mockMarkJobFailed,
@@ -34,6 +35,7 @@ const {
3435
isDev: true,
3536
},
3637
mockEnqueue: vi.fn().mockResolvedValue('job-id-1'),
38+
mockGetJob: vi.fn().mockResolvedValue(null),
3739
mockStartJob: vi.fn().mockResolvedValue(undefined),
3840
mockCompleteJob: vi.fn().mockResolvedValue(undefined),
3941
mockMarkJobFailed: vi.fn().mockResolvedValue(undefined),
@@ -54,6 +56,7 @@ vi.mock('@/lib/core/config/feature-flags', () => mockFeatureFlags)
5456
vi.mock('@/lib/core/async-jobs', () => ({
5557
getJobQueue: vi.fn().mockResolvedValue({
5658
enqueue: mockEnqueue,
59+
getJob: mockGetJob,
5760
startJob: mockStartJob,
5861
completeJob: mockCompleteJob,
5962
markJobFailed: mockMarkJobFailed,
@@ -265,6 +268,8 @@ describe('Scheduled Workflow Execution API Route', () => {
265268
requestId: 'test-request-id',
266269
}),
267270
expect.objectContaining({
271+
jobId: expect.stringMatching(/^schedule_[0-9a-f]{32}$/),
272+
concurrencyKey: expect.stringMatching(/^schedule_[0-9a-f]{32}$/),
268273
metadata: expect.objectContaining({
269274
workflowId: 'workflow-1',
270275
workspaceId: 'workspace-1',

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
22
import { createLogger } from '@sim/logger'
3+
import { sha256Hex } from '@sim/security/hash'
34
import { toError } from '@sim/utils/errors'
45
import { generateId } from '@sim/utils/id'
6+
import { Cron } from 'croner'
57
import { and, eq, inArray, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm'
68
import { type NextRequest, NextResponse } from 'next/server'
79
import { verifyCronAuth } from '@/lib/auth/internal'
810
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
11+
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
912
import { generateRequestId } from '@/lib/core/utils/request'
1013
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1114
import {
@@ -15,11 +18,13 @@ import {
1518
} from '@/background/schedule-execution'
1619

1720
export const dynamic = 'force-dynamic'
21+
export const maxDuration = 3600
1822

1923
const logger = createLogger('ScheduledExecuteAPI')
2024
const MAX_CRON_CLAIMS = 20
2125
const RESERVED_WORKFLOW_CLAIMS = 10
2226
const RESERVED_JOB_CLAIMS = MAX_CRON_CLAIMS - RESERVED_WORKFLOW_CLAIMS
27+
const STALE_SCHEDULE_CLAIM_MS = getMaxExecutionTimeout()
2328

2429
const dueFilter = (queuedAt: Date) =>
2530
and(
@@ -29,7 +34,8 @@ const dueFilter = (queuedAt: Date) =>
2934
ne(workflowSchedule.status, 'completed'),
3035
or(
3136
isNull(workflowSchedule.lastQueuedAt),
32-
lt(workflowSchedule.lastQueuedAt, workflowSchedule.nextRunAt)
37+
lt(workflowSchedule.lastQueuedAt, workflowSchedule.nextRunAt),
38+
lt(workflowSchedule.lastQueuedAt, new Date(queuedAt.getTime() - STALE_SCHEDULE_CLAIM_MS))
3339
)
3440
)
3541

@@ -46,6 +52,22 @@ const workflowScheduleFilter = (queuedAt: Date) =>
4652
const jobScheduleFilter = (queuedAt: Date) =>
4753
and(dueFilter(queuedAt), eq(workflowSchedule.sourceType, 'job'))
4854

55+
function buildScheduleExecutionJobId(schedule: {
56+
id: string
57+
nextRunAt?: Date | null
58+
lastQueuedAt?: Date | null
59+
}): string {
60+
const occurrence =
61+
schedule.nextRunAt?.toISOString() ?? schedule.lastQueuedAt?.toISOString() ?? 'due'
62+
return `schedule_${sha256Hex(`${schedule.id}:${occurrence}`).slice(0, 32)}`
63+
}
64+
65+
function getNextRunFromCronExpression(cronExpression?: string | null): Date | null {
66+
if (!cronExpression) return null
67+
const cron = new Cron(cronExpression)
68+
return cron.nextRun()
69+
}
70+
4971
async function claimWorkflowSchedules(queuedAt: Date, limit: number) {
5072
if (limit <= 0) return []
5173

@@ -182,12 +204,40 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
182204
}
183205

184206
try {
207+
const scheduleJobId = buildScheduleExecutionJobId(schedule)
208+
const existingJob = await jobQueue.getJob(scheduleJobId)
209+
if (existingJob && ['pending', 'processing'].includes(existingJob.status)) {
210+
logger.info(`[${requestId}] Schedule execution job already exists`, {
211+
scheduleId: schedule.id,
212+
jobId: scheduleJobId,
213+
status: existingJob.status,
214+
})
215+
return
216+
}
217+
if (existingJob) {
218+
logger.info(`[${requestId}] Releasing stale schedule claim for finished job`, {
219+
scheduleId: schedule.id,
220+
jobId: scheduleJobId,
221+
status: existingJob.status,
222+
})
223+
await releaseScheduleLock(
224+
schedule.id,
225+
requestId,
226+
queuedAt,
227+
`Released stale schedule ${schedule.id} for finished job ${scheduleJobId}`,
228+
getNextRunFromCronExpression(schedule.cronExpression)
229+
)
230+
return
231+
}
232+
185233
const resolvedWorkflow = schedule.workflowId
186234
? await workflowUtils?.getWorkflowById(schedule.workflowId)
187235
: null
188236
const resolvedWorkspaceId = resolvedWorkflow?.workspaceId
189237

190238
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
239+
jobId: scheduleJobId,
240+
concurrencyKey: scheduleJobId,
191241
metadata: {
192242
workflowId: schedule.workflowId ?? undefined,
193243
workspaceId: resolvedWorkspaceId ?? undefined,
@@ -198,6 +248,23 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
198248
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
199249
)
200250

251+
const queuedJob = await jobQueue.getJob(jobId)
252+
if (queuedJob && !['pending', 'processing'].includes(queuedJob.status)) {
253+
logger.info(`[${requestId}] Schedule execution job already finished`, {
254+
scheduleId: schedule.id,
255+
jobId,
256+
status: queuedJob.status,
257+
})
258+
await releaseScheduleLock(
259+
schedule.id,
260+
requestId,
261+
queuedAt,
262+
`Released stale schedule ${schedule.id} for finished job ${jobId}`,
263+
getNextRunFromCronExpression(schedule.cronExpression)
264+
)
265+
return
266+
}
267+
201268
if (shouldExecuteInline()) {
202269
try {
203270
await jobQueue.startJob(jobId)

apps/sim/background/schedule-execution.ts

Lines changed: 100 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@ import { generateId } from '@sim/utils/id'
1111
import { task } from '@trigger.dev/sdk'
1212
import { Cron } from 'croner'
1313
import { and, eq, isNull } from 'drizzle-orm'
14+
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
1415
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
15-
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
16+
import {
17+
createTimeoutAbortController,
18+
getExecutionTimeout,
19+
getTimeoutErrorMessage,
20+
} from '@/lib/core/execution-limits'
1621
import { preprocessExecution } from '@/lib/execution/preprocessing'
1722
import { LoggingSession } from '@/lib/logs/execution/logging-session'
1823
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
@@ -1007,6 +1012,8 @@ export async function executeJobInline(payload: JobExecutionPayload) {
10071012
const promptText = buildJobPrompt(jobRecord)
10081013

10091014
try {
1015+
const userSubscription = await getHighestPrioritySubscription(jobRecord.sourceUserId)
1016+
const mothershipJobTimeoutMs = getExecutionTimeout(userSubscription?.plan, 'sync')
10101017
const url = buildAPIUrl('/api/mothership/execute')
10111018
const headers = await buildAuthHeaders(jobRecord.sourceUserId)
10121019

@@ -1018,16 +1025,52 @@ export async function executeJobInline(payload: JobExecutionPayload) {
10181025
}
10191026

10201027
const startTime = new Date()
1021-
const response = await fetch(url.toString(), {
1022-
method: 'POST',
1023-
headers,
1024-
body: JSON.stringify(body),
1025-
})
1026-
const endTime = new Date()
1027-
const durationMs = endTime.getTime() - startTime.getTime()
1028+
const timeoutController = createTimeoutAbortController(mothershipJobTimeoutMs)
1029+
try {
1030+
const response = await fetch(url.toString(), {
1031+
method: 'POST',
1032+
headers,
1033+
body: JSON.stringify(body),
1034+
signal: timeoutController.signal,
1035+
})
1036+
1037+
if (!response.ok) {
1038+
const errorText = await response.text().catch(() => {
1039+
if (timeoutController.isTimedOut()) {
1040+
throw new Error(getTimeoutErrorMessage(null, timeoutController.timeoutMs))
1041+
}
1042+
return 'Unknown error'
1043+
})
1044+
const endTime = new Date()
1045+
const durationMs = endTime.getTime() - startTime.getTime()
10281046

1029-
if (!response.ok) {
1030-
const errorText = await response.text().catch(() => 'Unknown error')
1047+
await createJobLogEntry({
1048+
scheduleId: payload.scheduleId,
1049+
workspaceId: jobRecord.sourceWorkspaceId,
1050+
jobTitle: jobRecord.jobTitle,
1051+
startTime,
1052+
endTime,
1053+
durationMs,
1054+
success: false,
1055+
errorMessage: errorText,
1056+
})
1057+
1058+
throw new Error(`Mothership execution failed (${response.status}): ${errorText}`)
1059+
}
1060+
1061+
let responseBody: Record<string, any> = {}
1062+
let wasCompletedByTool = false
1063+
try {
1064+
responseBody = await response.json()
1065+
const toolCalls = responseBody?.toolCalls as Array<{ name?: string }> | undefined
1066+
wasCompletedByTool = toolCalls?.some((tc) => tc.name === 'complete_job') ?? false
1067+
} catch {
1068+
if (timeoutController.isTimedOut()) {
1069+
throw new Error(getTimeoutErrorMessage(null, timeoutController.timeoutMs))
1070+
}
1071+
}
1072+
const endTime = new Date()
1073+
const durationMs = endTime.getTime() - startTime.getTime()
10311074

10321075
await createJobLogEntry({
10331076
scheduleId: payload.scheduleId,
@@ -1036,92 +1079,71 @@ export async function executeJobInline(payload: JobExecutionPayload) {
10361079
startTime,
10371080
endTime,
10381081
durationMs,
1039-
success: false,
1040-
errorMessage: errorText,
1082+
success: true,
1083+
responseBody,
10411084
})
10421085

1043-
throw new Error(`Mothership execution failed (${response.status}): ${errorText}`)
1044-
}
1086+
const newRunCount = (jobRecord.runCount || 0) + 1
10451087

1046-
let responseBody: Record<string, any> = {}
1047-
let wasCompletedByTool = false
1048-
try {
1049-
responseBody = await response.json()
1050-
const toolCalls = responseBody?.toolCalls as Array<{ name?: string }> | undefined
1051-
wasCompletedByTool = toolCalls?.some((tc) => tc.name === 'complete_job') ?? false
1052-
} catch {
1053-
// Response may not be JSON; proceed with normal flow
1054-
}
1088+
logger.info(`[${requestId}] Job executed successfully`, {
1089+
scheduleId: payload.scheduleId,
1090+
runCount: newRunCount,
1091+
wasCompletedByTool,
1092+
})
10551093

1056-
await createJobLogEntry({
1057-
scheduleId: payload.scheduleId,
1058-
workspaceId: jobRecord.sourceWorkspaceId,
1059-
jobTitle: jobRecord.jobTitle,
1060-
startTime,
1061-
endTime,
1062-
durationMs,
1063-
success: true,
1064-
responseBody,
1065-
})
1094+
if (wasCompletedByTool) {
1095+
await applyScheduleUpdate(
1096+
payload.scheduleId,
1097+
{
1098+
lastRanAt: now,
1099+
updatedAt: now,
1100+
runCount: newRunCount,
1101+
failedCount: 0,
1102+
lastQueuedAt: null,
1103+
},
1104+
requestId,
1105+
`Error updating job ${payload.scheduleId} after completion`
1106+
)
1107+
return
1108+
}
10661109

1067-
const newRunCount = (jobRecord.runCount || 0) + 1
1110+
const isOneTime = !jobRecord.cronExpression
1111+
let nextRunAt: Date | null = null
10681112

1069-
logger.info(`[${requestId}] Job executed successfully`, {
1070-
scheduleId: payload.scheduleId,
1071-
runCount: newRunCount,
1072-
wasCompletedByTool,
1073-
})
1113+
if (!isOneTime && jobRecord.cronExpression) {
1114+
const validation = validateCronExpression(
1115+
jobRecord.cronExpression,
1116+
jobRecord.timezone || 'UTC'
1117+
)
1118+
nextRunAt = validation.nextRun || null
1119+
}
1120+
1121+
const maxRunsReached = jobRecord.maxRuns && newRunCount >= jobRecord.maxRuns
1122+
if (maxRunsReached) {
1123+
logger.info(`[${requestId}] Job hit maxRuns limit`, {
1124+
scheduleId: payload.scheduleId,
1125+
maxRuns: jobRecord.maxRuns,
1126+
runCount: newRunCount,
1127+
})
1128+
}
10741129

1075-
if (wasCompletedByTool) {
10761130
await applyScheduleUpdate(
10771131
payload.scheduleId,
10781132
{
10791133
lastRanAt: now,
10801134
updatedAt: now,
1081-
runCount: newRunCount,
1135+
nextRunAt: isOneTime || maxRunsReached ? null : nextRunAt,
10821136
failedCount: 0,
10831137
lastQueuedAt: null,
1138+
runCount: newRunCount,
1139+
status: isOneTime || maxRunsReached ? 'completed' : 'active',
10841140
},
10851141
requestId,
1086-
`Error updating job ${payload.scheduleId} after completion`
1087-
)
1088-
return
1089-
}
1090-
1091-
const isOneTime = !jobRecord.cronExpression
1092-
let nextRunAt: Date | null = null
1093-
1094-
if (!isOneTime && jobRecord.cronExpression) {
1095-
const validation = validateCronExpression(
1096-
jobRecord.cronExpression,
1097-
jobRecord.timezone || 'UTC'
1142+
`Error updating job ${payload.scheduleId} after success`
10981143
)
1099-
nextRunAt = validation.nextRun || null
1100-
}
1101-
1102-
const maxRunsReached = jobRecord.maxRuns && newRunCount >= jobRecord.maxRuns
1103-
if (maxRunsReached) {
1104-
logger.info(`[${requestId}] Job hit maxRuns limit`, {
1105-
scheduleId: payload.scheduleId,
1106-
maxRuns: jobRecord.maxRuns,
1107-
runCount: newRunCount,
1108-
})
1144+
} finally {
1145+
timeoutController.cleanup()
11091146
}
1110-
1111-
await applyScheduleUpdate(
1112-
payload.scheduleId,
1113-
{
1114-
lastRanAt: now,
1115-
updatedAt: now,
1116-
nextRunAt: isOneTime || maxRunsReached ? null : nextRunAt,
1117-
failedCount: 0,
1118-
lastQueuedAt: null,
1119-
runCount: newRunCount,
1120-
status: isOneTime || maxRunsReached ? 'completed' : 'active',
1121-
},
1122-
requestId,
1123-
`Error updating job ${payload.scheduleId} after success`
1124-
)
11251147
} catch (error) {
11261148
const errorMessage = toError(error).message
11271149
logger.error(`[${requestId}] Job execution failed`, {

0 commit comments

Comments
 (0)