Skip to content

Commit f0adc5e

Browse files
icecrasher321Theodore Li
authored andcommitted
feat(concurrency): bullmq based concurrency control system (#3605)
* feat(concurrency): bullmq based queueing system * fix bun lock * remove manual execs off queues * address comments * fix legacy team limits * cleanup enterprise typing code * inline child triggers * fix status check * address more comments * optimize reconciler scan * remove dead code * add to landing page * Add load testing framework * update bullmq * fix * fix headless path --------- Co-authored-by: Theodore Li <teddy@zenobiapay.com>
1 parent 251046d commit f0adc5e

File tree

88 files changed

+6273
-959
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+6273
-959
lines changed

apps/docs/content/docs/en/execution/costs.mdx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,17 @@ By default, your usage is capped at the credits included in your plan. To allow
195195

196196
Max (individual) shares the same rate limits as team plans. Team plans (Pro or Max for Teams) use the Max-tier rate limits.
197197

198+
### Concurrent Execution Limits
199+
200+
| Plan | Concurrent Executions |
201+
|------|----------------------|
202+
| **Free** | 5 |
203+
| **Pro** | 50 |
204+
| **Max / Team** | 200 |
205+
| **Enterprise** | 200 (customizable) |
206+
207+
Concurrent execution limits control how many workflow executions can run simultaneously within a workspace. When the limit is reached, new executions are queued and admitted as running executions complete. Manual runs from the editor are not subject to these limits.
208+
198209
### File Storage
199210

200211
| Plan | Storage |

apps/sim/app/(home)/components/pricing/pricing.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const PRICING_TIERS: PricingTier[] = [
2525
'5GB file storage',
2626
'3 tables · 1,000 rows each',
2727
'5 min execution limit',
28+
'5 concurrent/workspace',
2829
'7-day log retention',
2930
'CLI/SDK/MCP Access',
3031
],
@@ -42,6 +43,7 @@ const PRICING_TIERS: PricingTier[] = [
4243
'50GB file storage',
4344
'25 tables · 5,000 rows each',
4445
'50 min execution · 150 runs/min',
46+
'50 concurrent/workspace',
4547
'Unlimited log retention',
4648
'CLI/SDK/MCP Access',
4749
],
@@ -59,6 +61,7 @@ const PRICING_TIERS: PricingTier[] = [
5961
'500GB file storage',
6062
'25 tables · 5,000 rows each',
6163
'50 min execution · 300 runs/min',
64+
'200 concurrent/workspace',
6265
'Unlimited log retention',
6366
'CLI/SDK/MCP Access',
6467
],
@@ -75,6 +78,7 @@ const PRICING_TIERS: PricingTier[] = [
7578
'Custom file storage',
7679
'10,000 tables · 1M rows each',
7780
'Custom execution limits',
81+
'Custom concurrency limits',
7882
'Unlimited log retention',
7983
'SSO & SCIM · SOC2 & HIPAA',
8084
'Self hosting · Dedicated support',

apps/sim/app/api/copilot/chat/route.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { and, desc, eq, sql } from 'drizzle-orm'
55
import { type NextRequest, NextResponse } from 'next/server'
66
import { z } from 'zod'
77
import { getSession } from '@/lib/auth'
8+
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
89
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
910
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
1011
import {
@@ -539,10 +540,26 @@ export async function POST(req: NextRequest) {
539540
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
540541
}
541542

543+
const nsExecutionId = crypto.randomUUID()
544+
const nsRunId = crypto.randomUUID()
545+
546+
if (actualChatId) {
547+
await createRunSegment({
548+
id: nsRunId,
549+
executionId: nsExecutionId,
550+
chatId: actualChatId,
551+
userId: authenticatedUserId,
552+
workflowId,
553+
streamId: userMessageIdToUse,
554+
}).catch(() => {})
555+
}
556+
542557
const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
543558
userId: authenticatedUserId,
544559
workflowId,
545560
chatId: actualChatId,
561+
executionId: nsExecutionId,
562+
runId: nsRunId,
546563
goRoute: '/api/copilot',
547564
autoExecuteTools: true,
548565
interactive: true,
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import type { NextRequest } from 'next/server'
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
const {
8+
mockCheckHybridAuth,
9+
mockGetDispatchJobRecord,
10+
mockGetJobQueue,
11+
mockVerifyWorkflowAccess,
12+
mockGetWorkflowById,
13+
} = vi.hoisted(() => ({
14+
mockCheckHybridAuth: vi.fn(),
15+
mockGetDispatchJobRecord: vi.fn(),
16+
mockGetJobQueue: vi.fn(),
17+
mockVerifyWorkflowAccess: vi.fn(),
18+
mockGetWorkflowById: vi.fn(),
19+
}))
20+
21+
vi.mock('@sim/logger', () => ({
22+
createLogger: () => ({
23+
info: vi.fn(),
24+
warn: vi.fn(),
25+
error: vi.fn(),
26+
debug: vi.fn(),
27+
}),
28+
}))
29+
30+
vi.mock('@/lib/auth/hybrid', () => ({
31+
checkHybridAuth: mockCheckHybridAuth,
32+
}))
33+
34+
vi.mock('@/lib/core/async-jobs', () => ({
35+
JOB_STATUS: {
36+
PENDING: 'pending',
37+
PROCESSING: 'processing',
38+
COMPLETED: 'completed',
39+
FAILED: 'failed',
40+
},
41+
getJobQueue: mockGetJobQueue,
42+
}))
43+
44+
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
45+
getDispatchJobRecord: mockGetDispatchJobRecord,
46+
}))
47+
48+
vi.mock('@/lib/core/utils/request', () => ({
49+
generateRequestId: vi.fn().mockReturnValue('request-1'),
50+
}))
51+
52+
vi.mock('@/socket/middleware/permissions', () => ({
53+
verifyWorkflowAccess: mockVerifyWorkflowAccess,
54+
}))
55+
56+
vi.mock('@/lib/workflows/utils', () => ({
57+
getWorkflowById: mockGetWorkflowById,
58+
}))
59+
60+
import { GET } from './route'
61+
62+
function createMockRequest(): NextRequest {
63+
return {
64+
headers: {
65+
get: () => null,
66+
},
67+
} as NextRequest
68+
}
69+
70+
describe('GET /api/jobs/[jobId]', () => {
71+
beforeEach(() => {
72+
vi.clearAllMocks()
73+
74+
mockCheckHybridAuth.mockResolvedValue({
75+
success: true,
76+
userId: 'user-1',
77+
apiKeyType: undefined,
78+
workspaceId: undefined,
79+
})
80+
81+
mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: true })
82+
mockGetWorkflowById.mockResolvedValue({
83+
id: 'workflow-1',
84+
workspaceId: 'workspace-1',
85+
})
86+
87+
mockGetJobQueue.mockResolvedValue({
88+
getJob: vi.fn().mockResolvedValue(null),
89+
})
90+
})
91+
92+
it('returns dispatcher-aware waiting status with metadata', async () => {
93+
mockGetDispatchJobRecord.mockResolvedValue({
94+
id: 'dispatch-1',
95+
workspaceId: 'workspace-1',
96+
lane: 'runtime',
97+
queueName: 'workflow-execution',
98+
bullmqJobName: 'workflow-execution',
99+
bullmqPayload: {},
100+
metadata: {
101+
workflowId: 'workflow-1',
102+
},
103+
priority: 10,
104+
status: 'waiting',
105+
createdAt: 1000,
106+
admittedAt: 2000,
107+
})
108+
109+
const response = await GET(createMockRequest(), {
110+
params: Promise.resolve({ jobId: 'dispatch-1' }),
111+
})
112+
const body = await response.json()
113+
114+
expect(response.status).toBe(200)
115+
expect(body.status).toBe('waiting')
116+
expect(body.metadata.queueName).toBe('workflow-execution')
117+
expect(body.metadata.lane).toBe('runtime')
118+
expect(body.metadata.workspaceId).toBe('workspace-1')
119+
})
120+
121+
it('returns completed output from dispatch state', async () => {
122+
mockGetDispatchJobRecord.mockResolvedValue({
123+
id: 'dispatch-2',
124+
workspaceId: 'workspace-1',
125+
lane: 'interactive',
126+
queueName: 'workflow-execution',
127+
bullmqJobName: 'direct-workflow-execution',
128+
bullmqPayload: {},
129+
metadata: {
130+
workflowId: 'workflow-1',
131+
},
132+
priority: 1,
133+
status: 'completed',
134+
createdAt: 1000,
135+
startedAt: 2000,
136+
completedAt: 7000,
137+
output: { success: true },
138+
})
139+
140+
const response = await GET(createMockRequest(), {
141+
params: Promise.resolve({ jobId: 'dispatch-2' }),
142+
})
143+
const body = await response.json()
144+
145+
expect(response.status).toBe(200)
146+
expect(body.status).toBe('completed')
147+
expect(body.output).toEqual({ success: true })
148+
expect(body.metadata.duration).toBe(5000)
149+
})
150+
151+
it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
152+
mockGetDispatchJobRecord.mockResolvedValue(null)
153+
154+
const response = await GET(createMockRequest(), {
155+
params: Promise.resolve({ jobId: 'missing-job' }),
156+
})
157+
158+
expect(response.status).toBe(404)
159+
})
160+
})

apps/sim/app/api/jobs/[jobId]/route.ts

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { checkHybridAuth } from '@/lib/auth/hybrid'
4-
import { getJobQueue, JOB_STATUS } from '@/lib/core/async-jobs'
4+
import { getJobQueue } from '@/lib/core/async-jobs'
55
import { generateRequestId } from '@/lib/core/utils/request'
6+
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
7+
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
68
import { createErrorResponse } from '@/app/api/workflows/utils'
79

810
const logger = createLogger('TaskStatusAPI')
@@ -23,68 +25,54 @@ export async function GET(
2325

2426
const authenticatedUserId = authResult.userId
2527

28+
const dispatchJob = await getDispatchJobRecord(taskId)
2629
const jobQueue = await getJobQueue()
27-
const job = await jobQueue.getJob(taskId)
30+
const job = dispatchJob ? null : await jobQueue.getJob(taskId)
2831

29-
if (!job) {
32+
if (!job && !dispatchJob) {
3033
return createErrorResponse('Task not found', 404)
3134
}
3235

33-
if (job.metadata?.workflowId) {
36+
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata
37+
38+
if (metadataToCheck?.workflowId) {
3439
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
3540
const accessCheck = await verifyWorkflowAccess(
3641
authenticatedUserId,
37-
job.metadata.workflowId as string
42+
metadataToCheck.workflowId as string
3843
)
3944
if (!accessCheck.hasAccess) {
40-
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
45+
logger.warn(`[${requestId}] Access denied to workflow ${metadataToCheck.workflowId}`)
4146
return createErrorResponse('Access denied', 403)
4247
}
4348

4449
if (authResult.apiKeyType === 'workspace' && authResult.workspaceId) {
4550
const { getWorkflowById } = await import('@/lib/workflows/utils')
46-
const workflow = await getWorkflowById(job.metadata.workflowId as string)
51+
const workflow = await getWorkflowById(metadataToCheck.workflowId as string)
4752
if (!workflow?.workspaceId || workflow.workspaceId !== authResult.workspaceId) {
4853
return createErrorResponse('API key is not authorized for this workspace', 403)
4954
}
5055
}
51-
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
52-
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
56+
} else if (metadataToCheck?.userId && metadataToCheck.userId !== authenticatedUserId) {
57+
logger.warn(`[${requestId}] Access denied to user ${metadataToCheck.userId}`)
5358
return createErrorResponse('Access denied', 403)
54-
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
59+
} else if (!metadataToCheck?.userId && !metadataToCheck?.workflowId) {
5560
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
5661
return createErrorResponse('Access denied', 403)
5762
}
5863

59-
const mappedStatus = job.status === JOB_STATUS.PENDING ? 'queued' : job.status
60-
64+
const presented = presentDispatchOrJobStatus(dispatchJob, job)
6165
const response: any = {
6266
success: true,
6367
taskId,
64-
status: mappedStatus,
65-
metadata: {
66-
startedAt: job.startedAt,
67-
},
68-
}
69-
70-
if (job.status === JOB_STATUS.COMPLETED) {
71-
response.output = job.output
72-
response.metadata.completedAt = job.completedAt
73-
if (job.startedAt && job.completedAt) {
74-
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
75-
}
76-
}
77-
78-
if (job.status === JOB_STATUS.FAILED) {
79-
response.error = job.error
80-
response.metadata.completedAt = job.completedAt
81-
if (job.startedAt && job.completedAt) {
82-
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
83-
}
68+
status: presented.status,
69+
metadata: presented.metadata,
8470
}
8571

86-
if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
87-
response.estimatedDuration = 300000
72+
if (presented.output !== undefined) response.output = presented.output
73+
if (presented.error !== undefined) response.error = presented.error
74+
if (presented.estimatedDuration !== undefined) {
75+
response.estimatedDuration = presented.estimatedDuration
8876
}
8977

9078
return NextResponse.json(response)

apps/sim/app/api/mcp/copilot/route.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { eq, sql } from 'drizzle-orm'
1818
import { type NextRequest, NextResponse } from 'next/server'
1919
import { validateOAuthAccessToken } from '@/lib/auth/oauth-token'
2020
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
21+
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
2122
import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants'
2223
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
2324
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
@@ -727,10 +728,25 @@ async function handleBuildToolCall(
727728
chatId,
728729
}
729730

731+
const executionId = crypto.randomUUID()
732+
const runId = crypto.randomUUID()
733+
const messageId = requestPayload.messageId as string
734+
735+
await createRunSegment({
736+
id: runId,
737+
executionId,
738+
chatId,
739+
userId,
740+
workflowId: resolved.workflowId,
741+
streamId: messageId,
742+
}).catch(() => {})
743+
730744
const result = await orchestrateCopilotStream(requestPayload, {
731745
userId,
732746
workflowId: resolved.workflowId,
733747
chatId,
748+
executionId,
749+
runId,
734750
goRoute: '/api/mcp',
735751
autoExecuteTools: true,
736752
timeout: ORCHESTRATION_TIMEOUT_MS,

0 commit comments

Comments
 (0)