Skip to content

Commit 1e5cc39

Browse files
improvement(deployment): solve multiple client side races, and deployed state management issues (#4502)
* improvement(deploy): state transitions * more fixes * improvements and shift to outbox policy with eager call * address comments * ux improvement * address comments * address bugbot
1 parent 00b5b74 commit 1e5cc39

80 files changed

Lines changed: 5065 additions & 1039 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/sim/app/api/chat/manage/[id]/route.test.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import {
1616
workflowsOrchestrationMock,
1717
workflowsOrchestrationMockFns,
1818
workflowsPersistenceUtilsMock,
19-
workflowsPersistenceUtilsMockFns,
2019
} from '@sim/testing'
2120
import { NextRequest } from 'next/server'
2221
import { beforeEach, describe, expect, it, vi } from 'vitest'
@@ -28,7 +27,7 @@ const { mockCheckChatAccess } = vi.hoisted(() => ({
2827
const mockCreateSuccessResponse = workflowsApiUtilsMockFns.mockCreateSuccessResponse
2928
const mockCreateErrorResponse = workflowsApiUtilsMockFns.mockCreateErrorResponse
3029
const mockEncryptSecret = encryptionMockFns.mockEncryptSecret
31-
const mockDeployWorkflow = workflowsPersistenceUtilsMockFns.mockDeployWorkflow
30+
const mockPerformFullDeploy = workflowsOrchestrationMockFns.mockPerformFullDeploy
3231
const mockPerformChatUndeploy = workflowsOrchestrationMockFns.mockPerformChatUndeploy
3332
const mockNotifySocketDeploymentChanged =
3433
workflowsOrchestrationMockFns.mockNotifySocketDeploymentChanged
@@ -73,7 +72,7 @@ describe('Chat Edit API Route', () => {
7372
})
7473

7574
mockEncryptSecret.mockResolvedValue({ encrypted: 'encrypted-password' })
76-
mockDeployWorkflow.mockResolvedValue({ success: true, version: 1 })
75+
mockPerformFullDeploy.mockResolvedValue({ success: true, version: 1 })
7776
mockNotifySocketDeploymentChanged.mockResolvedValue(undefined)
7877
})
7978

apps/sim/app/api/chat/manage/[id]/route.ts

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import { isDev } from '@/lib/core/config/feature-flags'
1111
import { encryptSecret } from '@/lib/core/security/encryption'
1212
import { getEmailDomain } from '@/lib/core/utils/urls'
1313
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
14-
import { notifySocketDeploymentChanged, performChatUndeploy } from '@/lib/workflows/orchestration'
15-
import { deployWorkflow } from '@/lib/workflows/persistence/utils'
14+
import { performChatUndeploy, performFullDeploy } from '@/lib/workflows/orchestration'
1615
import { checkChatAccess } from '@/app/api/chat/utils'
1716
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
1817

1918
export const dynamic = 'force-dynamic'
19+
export const maxDuration = 120
2020

2121
const logger = createLogger('ChatDetailAPI')
2222

@@ -126,21 +126,8 @@ export const PATCH = withRouteHandler(
126126
}
127127
}
128128

129-
// Redeploy the workflow to ensure latest version is active
130-
const deployResult = await deployWorkflow({
131-
workflowId: existingChat[0].workflowId,
132-
deployedBy: session.user.id,
133-
})
134-
135-
if (!deployResult.success) {
136-
logger.warn(
137-
`Failed to redeploy workflow for chat update: ${deployResult.error}, continuing with chat update`
138-
)
139-
} else {
140-
logger.info(
141-
`Redeployed workflow ${existingChat[0].workflowId} for chat update (v${deployResult.version})`
142-
)
143-
await notifySocketDeploymentChanged(existingChat[0].workflowId)
129+
if (workflowId && workflowId !== existingChat[0].workflowId) {
130+
return createErrorResponse('Changing a chat deployment workflow is not supported', 400)
144131
}
145132

146133
let encryptedPassword
@@ -156,6 +143,27 @@ export const PATCH = withRouteHandler(
156143
logger.info('Keeping existing password')
157144
}
158145

146+
// Redeploy the workflow to ensure latest version is active
147+
const deployResult = await performFullDeploy({
148+
workflowId: existingChat[0].workflowId,
149+
userId: session.user.id,
150+
request,
151+
})
152+
153+
if (!deployResult.success) {
154+
logger.warn(`Failed to redeploy workflow for chat update: ${deployResult.error}`)
155+
const status =
156+
deployResult.errorCode === 'validation'
157+
? 400
158+
: deployResult.errorCode === 'not_found'
159+
? 404
160+
: 500
161+
return createErrorResponse(deployResult.error || 'Failed to redeploy workflow', status)
162+
}
163+
logger.info(
164+
`Redeployed workflow ${existingChat[0].workflowId} for chat update (v${deployResult.version})`
165+
)
166+
159167
const updateData: Record<string, unknown> = {
160168
updatedAt: new Date(),
161169
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import {
5+
authMockFns,
6+
dbChainMock,
7+
dbChainMockFns,
8+
resetDbChainMock,
9+
workflowsApiUtilsMock,
10+
workflowsApiUtilsMockFns,
11+
workflowsOrchestrationMock,
12+
workflowsOrchestrationMockFns,
13+
} from '@sim/testing'
14+
import { NextRequest } from 'next/server'
15+
import { beforeEach, describe, expect, it, vi } from 'vitest'
16+
17+
const { mockCheckWorkflowAccessForFormCreation } = vi.hoisted(() => ({
18+
mockCheckWorkflowAccessForFormCreation: vi.fn(),
19+
}))
20+
21+
const mockCreateErrorResponse = workflowsApiUtilsMockFns.mockCreateErrorResponse
22+
const mockPerformFullDeploy = workflowsOrchestrationMockFns.mockPerformFullDeploy
23+
24+
vi.mock('@sim/db', () => dbChainMock)
25+
26+
vi.mock('@sim/utils/id', () => ({
27+
generateId: vi.fn(() => 'form-123'),
28+
}))
29+
30+
vi.mock('@/app/api/form/utils', () => ({
31+
checkWorkflowAccessForFormCreation: mockCheckWorkflowAccessForFormCreation,
32+
DEFAULT_FORM_CUSTOMIZATIONS: {},
33+
}))
34+
35+
vi.mock('@/app/api/workflows/utils', () => workflowsApiUtilsMock)
36+
37+
vi.mock('@/lib/core/config/feature-flags', () => ({
38+
isDev: true,
39+
}))
40+
41+
vi.mock('@/lib/core/utils/urls', () => ({
42+
getEmailDomain: vi.fn(() => 'localhost:3000'),
43+
}))
44+
45+
vi.mock('@/lib/workflows/orchestration', () => workflowsOrchestrationMock)
46+
47+
import { POST } from '@/app/api/form/route'
48+
49+
describe('Form API Route', () => {
50+
beforeEach(() => {
51+
vi.clearAllMocks()
52+
resetDbChainMock()
53+
54+
authMockFns.mockGetSession.mockResolvedValue({
55+
user: {
56+
id: 'user-123',
57+
email: 'user@example.com',
58+
name: 'Test User',
59+
},
60+
})
61+
mockCreateErrorResponse.mockImplementation((message, status = 500) => {
62+
return new Response(JSON.stringify({ error: message }), {
63+
status,
64+
headers: { 'Content-Type': 'application/json' },
65+
})
66+
})
67+
mockCheckWorkflowAccessForFormCreation.mockResolvedValue({
68+
hasAccess: true,
69+
workflow: {
70+
id: 'workflow-123',
71+
isDeployed: false,
72+
workspaceId: 'workspace-123',
73+
},
74+
})
75+
dbChainMockFns.limit.mockResolvedValue([])
76+
})
77+
78+
it('cleans up inserted form when deploy throws', async () => {
79+
mockPerformFullDeploy.mockRejectedValue(new Error('Deploy exploded'))
80+
81+
const request = new NextRequest('http://localhost:3000/api/form', {
82+
method: 'POST',
83+
body: JSON.stringify({
84+
workflowId: 'workflow-123',
85+
identifier: 'test-form',
86+
title: 'Test Form',
87+
}),
88+
})
89+
90+
const response = await POST(request)
91+
92+
expect(response.status).toBe(500)
93+
expect(dbChainMockFns.insert).toHaveBeenCalled()
94+
expect(dbChainMockFns.delete).toHaveBeenCalled()
95+
expect(mockCreateErrorResponse).toHaveBeenCalledWith('Deploy exploded', 500)
96+
})
97+
})

apps/sim/app/api/form/route.ts

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,28 @@ import { isDev } from '@/lib/core/config/feature-flags'
1212
import { encryptSecret } from '@/lib/core/security/encryption'
1313
import { getEmailDomain } from '@/lib/core/utils/urls'
1414
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
15-
import { notifySocketDeploymentChanged } from '@/lib/workflows/orchestration'
16-
import { deployWorkflow } from '@/lib/workflows/persistence/utils'
15+
import { performFullDeploy } from '@/lib/workflows/orchestration'
1716
import {
1817
checkWorkflowAccessForFormCreation,
1918
DEFAULT_FORM_CUSTOMIZATIONS,
2019
} from '@/app/api/form/utils'
2120
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
2221

2322
const logger = createLogger('FormAPI')
23+
export const maxDuration = 120
2424

2525
function getErrorMessage(error: unknown, fallback: string): string {
2626
return error instanceof Error ? error.message : fallback
2727
}
2828

29+
async function cleanupFormAfterDeployFailure(formId: string) {
30+
try {
31+
await db.delete(form).where(eq(form.id, formId))
32+
} catch (cleanupError) {
33+
logger.error('Failed to clean up form after deploy failure:', cleanupError)
34+
}
35+
}
36+
2937
export const GET = withRouteHandler(async (request: NextRequest) => {
3038
try {
3139
const session = await getSession()
@@ -106,21 +114,6 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
106114
return createErrorResponse('Workflow not found or access denied', 404)
107115
}
108116

109-
const result = await deployWorkflow({
110-
workflowId,
111-
deployedBy: session.user.id,
112-
})
113-
114-
if (!result.success) {
115-
return createErrorResponse(result.error || 'Failed to deploy workflow', 500)
116-
}
117-
118-
logger.info(
119-
`${workflowRecord.isDeployed ? 'Redeployed' : 'Auto-deployed'} workflow ${workflowId} for form (v${result.version})`
120-
)
121-
122-
await notifySocketDeploymentChanged(workflowId)
123-
124117
let encryptedPassword = null
125118
if (authType === 'password' && password) {
126119
const { encrypted } = await encryptSecret(password)
@@ -161,6 +154,29 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
161154
updatedAt: new Date(),
162155
})
163156

157+
let result: Awaited<ReturnType<typeof performFullDeploy>>
158+
try {
159+
result = await performFullDeploy({
160+
workflowId,
161+
userId: session.user.id,
162+
request,
163+
})
164+
} catch (error) {
165+
await cleanupFormAfterDeployFailure(id)
166+
throw error
167+
}
168+
169+
if (!result.success) {
170+
await cleanupFormAfterDeployFailure(id)
171+
const status =
172+
result.errorCode === 'validation' ? 400 : result.errorCode === 'not_found' ? 404 : 500
173+
return createErrorResponse(result.error || 'Failed to deploy workflow', status)
174+
}
175+
176+
logger.info(
177+
`${workflowRecord.isDeployed ? 'Redeployed' : 'Auto-deployed'} workflow ${workflowId} for form (v${result.version})`
178+
)
179+
164180
const baseDomain = getEmailDomain()
165181
const protocol = isDev ? 'http' : 'https'
166182
const formUrl = `${protocol}://${baseDomain}/form/${identifier}`

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const {
2020
mockExecuteJobInline,
2121
mockFeatureFlags,
2222
mockEnqueue,
23+
mockGetJob,
2324
mockStartJob,
2425
mockCompleteJob,
2526
mockMarkJobFailed,
@@ -34,6 +35,7 @@ const {
3435
isDev: true,
3536
},
3637
mockEnqueue: vi.fn().mockResolvedValue('job-id-1'),
38+
mockGetJob: vi.fn().mockResolvedValue(null),
3739
mockStartJob: vi.fn().mockResolvedValue(undefined),
3840
mockCompleteJob: vi.fn().mockResolvedValue(undefined),
3941
mockMarkJobFailed: vi.fn().mockResolvedValue(undefined),
@@ -54,6 +56,7 @@ vi.mock('@/lib/core/config/feature-flags', () => mockFeatureFlags)
5456
vi.mock('@/lib/core/async-jobs', () => ({
5557
getJobQueue: vi.fn().mockResolvedValue({
5658
enqueue: mockEnqueue,
59+
getJob: mockGetJob,
5760
startJob: mockStartJob,
5861
completeJob: mockCompleteJob,
5962
markJobFailed: mockMarkJobFailed,
@@ -69,6 +72,7 @@ vi.mock('drizzle-orm', () => ({
6972
ne: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'ne' })),
7073
lte: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'lte' })),
7174
lt: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'lt' })),
75+
inArray: vi.fn((field: unknown, values: unknown[]) => ({ field, values, type: 'inArray' })),
7276
not: vi.fn((condition: unknown) => ({ type: 'not', condition })),
7377
isNull: vi.fn((field: unknown) => ({ type: 'isNull', field })),
7478
or: vi.fn((...conditions: unknown[]) => ({ type: 'or', conditions })),
@@ -166,6 +170,8 @@ function createMockRequest(): NextRequest {
166170
describe('Scheduled Workflow Execution API Route', () => {
167171
beforeEach(() => {
168172
vi.clearAllMocks()
173+
dbChainMockFns.limit.mockReset()
174+
dbChainMockFns.returning.mockReset()
169175
resetDbChainMock()
170176
requestUtilsMockFns.mockGenerateRequestId.mockReturnValue('test-request-id')
171177
workflowsUtilsMockFns.mockGetWorkflowById.mockResolvedValue({
@@ -180,6 +186,7 @@ describe('Scheduled Workflow Execution API Route', () => {
180186
})
181187

182188
it('should execute scheduled workflows with Trigger.dev disabled', async () => {
189+
dbChainMockFns.limit.mockResolvedValueOnce([{ id: 'schedule-1' }]).mockResolvedValueOnce([])
183190
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
184191

185192
const response = await GET(createMockRequest())
@@ -193,6 +200,7 @@ describe('Scheduled Workflow Execution API Route', () => {
193200

194201
it('should queue schedules to Trigger.dev when enabled', async () => {
195202
mockFeatureFlags.isTriggerDevEnabled = true
203+
dbChainMockFns.limit.mockResolvedValueOnce([{ id: 'schedule-1' }]).mockResolvedValueOnce([])
196204
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
197205

198206
const response = await GET(createMockRequest())
@@ -215,6 +223,9 @@ describe('Scheduled Workflow Execution API Route', () => {
215223
})
216224

217225
it('should execute multiple schedules in parallel', async () => {
226+
dbChainMockFns.limit
227+
.mockResolvedValueOnce([{ id: 'schedule-1' }, { id: 'schedule-2' }])
228+
.mockResolvedValueOnce([])
218229
dbChainMockFns.returning.mockReturnValueOnce(MULTIPLE_SCHEDULES).mockReturnValueOnce([])
219230

220231
const response = await GET(createMockRequest())
@@ -225,7 +236,8 @@ describe('Scheduled Workflow Execution API Route', () => {
225236
})
226237

227238
it('should execute mothership jobs inline', async () => {
228-
dbChainMockFns.returning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
239+
dbChainMockFns.limit.mockResolvedValueOnce([]).mockResolvedValueOnce([{ id: 'job-1' }])
240+
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_JOB)
229241

230242
const response = await GET(createMockRequest())
231243

@@ -241,6 +253,7 @@ describe('Scheduled Workflow Execution API Route', () => {
241253
})
242254

243255
it('should enqueue schedule with correlation metadata via job queue', async () => {
256+
dbChainMockFns.limit.mockResolvedValueOnce([{ id: 'schedule-1' }]).mockResolvedValueOnce([])
244257
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
245258

246259
const response = await GET(createMockRequest())
@@ -255,6 +268,8 @@ describe('Scheduled Workflow Execution API Route', () => {
255268
requestId: 'test-request-id',
256269
}),
257270
expect.objectContaining({
271+
jobId: expect.stringMatching(/^schedule_[0-9a-f]{32}$/),
272+
concurrencyKey: expect.stringMatching(/^schedule_[0-9a-f]{32}$/),
258273
metadata: expect.objectContaining({
259274
workflowId: 'workflow-1',
260275
workspaceId: 'workspace-1',

0 commit comments

Comments
 (0)