Skip to content

Commit 81845ae

Browse files
fix(table): trigger cascade race fixes, polling, workflow column flag (#4499)
* fix(table): don't let parallel queued stamp overwrite a worker that already started Stamps fire in chunks of 20 via Promise.all, so queued writes race with the worker's markWorkflowGroupPickedUp (running). When the late queued stamp landed second it overwrote running, and the cell looked stuck in queued for the rest of the run. Skip the stamp when the same execution is already past queued — the worker's authority wins. * fix(table): per-page polling, optimistic skip on filled outputs, workflow column flag - Polling now refetches only pages that contain in-flight cells instead of every loaded page. Idle pages stay untouched while a cascade runs. - run_column optimistic patch mirrors server eligibility on mode='incomplete': cells with filled outputs no longer flip to queued only to revert seconds later when the server returns 0 triggered. - Hide the Workflow column type behind NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED (default false). Existing workflow groups keep rendering. * fix(table): memoize infinite-rows queryKey so polling effect doesn't reset every tick * fix lint * fix(table): serialize polling ticks to prevent overlapping fetches * fix(table): fire-and-forget run-column dispatch Large fan-outs (thousands of rows) issue sequential trigger.dev HTTP calls inside scheduleRunsForRows.batchEnqueue. Awaiting that loop held the HTTP response (and the AI tool span) open for ~5 min on a 6k-row table — the user saw an 11-min "running" because the tool didn't return until every job had been enqueued. Run the dispatcher in the background and return immediately; contract response now reports `triggered: null` since the count isn't known synchronously. * improvement(table): preserve row identity across poll refetches Each poll tick brings back a fresh page from the server with all-new row objects, even though most rows haven't changed. setQueryData was replacing the whole page reference, which made every memoized <DataRow> in the page re-render every 1.5s. Now we shallow-compare each fresh row against the cached one and reuse the cached reference when nothing changed; only rows whose data or exec status actually flipped re-render. * fix(table): treat manual-bypass eligibility as runnable Refactoring eligibility into classifyEligibility split the runnable answer into two reasons: 'eligible' (deps satisfied) and 'manual-bypass' (autoRun= false group on a manual run, deps don't apply). isGroupEligible only treated 'eligible' as runnable, so a manual "Run all rows" on a single autoRun=false group filtered every row out and returned triggered: 0. Cells flashed queued from the optimistic patch then went empty when the refetch landed.
1 parent b045eae commit 81845ae

9 files changed

Lines changed: 220 additions & 48 deletions

File tree

apps/sim/app/api/table/[tableId]/columns/run/route.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { createLogger } from '@sim/logger'
2+
import { toError } from '@sim/utils/errors'
23
import { type NextRequest, NextResponse } from 'next/server'
34
import { runColumnContract } from '@/lib/api/contracts/tables'
45
import { parseRequest } from '@/lib/api/server'
@@ -29,15 +30,21 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
2930
const access = await checkAccess(tableId, auth.userId, 'write')
3031
if (!access.ok) return accessError(access, requestId, tableId)
3132

32-
const { triggered } = await runWorkflowColumn({
33+
// Dispatch in the background — large fan-outs (thousands of rows) issue
34+
// sequential trigger.dev calls and would otherwise hold the HTTP response
35+
// open for minutes, blocking the AI/copilot tool span and the UI mutation.
36+
void runWorkflowColumn({
3337
tableId,
3438
workspaceId,
3539
groupIds,
3640
mode: runMode,
3741
rowIds,
3842
requestId,
43+
}).catch((err) => {
44+
logger.error(`[${requestId}] run-column dispatch failed:`, toError(err).message)
3945
})
40-
return NextResponse.json({ success: true, data: { triggered } })
46+
47+
return NextResponse.json({ success: true, data: { triggered: null } })
4148
} catch (error) {
4249
if (error instanceof Error && error.message === 'Invalid workspace ID') {
4350
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@ import {
88
DropdownMenuTrigger,
99
} from '@/components/emcn'
1010
import { Plus } from '@/components/emcn/icons'
11+
import { isWorkflowColumnsEnabledClient } from '@/lib/core/config/feature-flags'
1112
import type { ColumnDefinition } from '@/lib/table'
1213
import { COLUMN_TYPE_OPTIONS } from '../column-config-sidebar'
1314

15+
const VISIBLE_COLUMN_TYPE_OPTIONS = isWorkflowColumnsEnabledClient
16+
? COLUMN_TYPE_OPTIONS
17+
: COLUMN_TYPE_OPTIONS.filter((o) => o.type !== 'workflow')
18+
1419
const CELL_HEADER =
1520
'border-[var(--border)] border-r border-b bg-[var(--bg)] px-2 py-[7px] text-left align-middle'
1621

@@ -56,7 +61,7 @@ export function NewColumnDropdown({
5661
)}
5762
</DropdownMenuTrigger>
5863
<DropdownMenuContent align='start' side='bottom' sideOffset={4}>
59-
{COLUMN_TYPE_OPTIONS.map((option) => {
64+
{VISIBLE_COLUMN_TYPE_OPTIONS.map((option) => {
6065
const Icon = option.icon
6166
const onSelect =
6267
option.type === 'workflow'

apps/sim/hooks/queries/tables.ts

Lines changed: 158 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* React Query hooks for managing user-defined tables.
55
*/
66

7+
import { useEffect, useMemo } from 'react'
78
import { createLogger } from '@sim/logger'
89
import {
910
type InfiniteData,
@@ -68,7 +69,7 @@ import type {
6869
WorkflowGroupDependencies,
6970
WorkflowGroupOutput,
7071
} from '@/lib/table'
71-
import { optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'
72+
import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'
7273

7374
/** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */
7475
const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500
@@ -84,12 +85,75 @@ function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean {
8485
return false
8586
}
8687

87-
function hasRunningGroupExecutionInPages(pages: TableRowsResponse[] | undefined): boolean {
88-
if (!pages) return false
89-
for (const page of pages) {
90-
if (hasRunningGroupExecution(page.rows)) return true
88+
/**
89+
* Shallow-equality on the fields the renderer reads from a row. Row data is
90+
* also shallow-compared — workflow output cells are scalars, so `===` per key
91+
* suffices. `executions` is a per-group exec metadata object; we compare each
92+
* group's `(status, jobId, executionId, error)` tuple. Any deeper drift forces
93+
* a fresh row reference, which is the safe default.
94+
*/
95+
function rowEqual(a: TableRow, b: TableRow): boolean {
96+
if (a === b) return true
97+
if (a.position !== b.position) return false
98+
const aData = a.data ?? {}
99+
const bData = b.data ?? {}
100+
const aDataKeys = Object.keys(aData)
101+
if (aDataKeys.length !== Object.keys(bData).length) return false
102+
for (const k of aDataKeys) {
103+
if (aData[k] !== bData[k]) return false
91104
}
92-
return false
105+
const aExec = a.executions ?? {}
106+
const bExec = b.executions ?? {}
107+
const aExecKeys = Object.keys(aExec)
108+
if (aExecKeys.length !== Object.keys(bExec).length) return false
109+
for (const k of aExecKeys) {
110+
const ax = aExec[k]
111+
const bx = bExec[k]
112+
if (ax === bx) continue
113+
if (!ax || !bx) return false
114+
if (
115+
ax.status !== bx.status ||
116+
ax.jobId !== bx.jobId ||
117+
ax.executionId !== bx.executionId ||
118+
ax.error !== bx.error
119+
) {
120+
return false
121+
}
122+
}
123+
return true
124+
}
125+
126+
/**
127+
* Replaces `prev.rows` element-by-element with `fresh.rows`, but reuses the
128+
* `prev` reference for any row that hasn't changed. Memoized `<DataRow>`
129+
* children short-circuit on row-identity, so a poll tick that arrives with
130+
* 1000 rows but only flips the status of 5 only re-renders those 5 instead
131+
* of every row in the page.
132+
*/
133+
function mergePagePreservingIdentity(
134+
prev: TableRowsResponse,
135+
fresh: TableRowsResponse
136+
): TableRowsResponse {
137+
if (prev.rows === fresh.rows) return prev
138+
const oldById = new Map(prev.rows.map((r) => [r.id, r]))
139+
let changed = false
140+
const merged = fresh.rows.map((freshRow) => {
141+
const old = oldById.get(freshRow.id)
142+
if (old && rowEqual(old, freshRow)) return old
143+
changed = true
144+
return freshRow
145+
})
146+
if (!changed && merged.length === prev.rows.length) {
147+
let identical = true
148+
for (let i = 0; i < merged.length; i++) {
149+
if (merged[i] !== prev.rows[i]) {
150+
identical = false
151+
break
152+
}
153+
}
154+
if (identical && prev.totalCount === fresh.totalCount) return prev
155+
}
156+
return { ...fresh, rows: merged }
93157
}
94158

95159
const logger = createLogger('TableQueries')
@@ -293,9 +357,10 @@ export function useInfiniteTableRows({
293357
filter: filter ?? null,
294358
sort: sort ?? null,
295359
})
360+
const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey])
296361

297-
return useInfiniteQuery({
298-
queryKey: tableKeys.infiniteRows(tableId, paramsKey),
362+
const query = useInfiniteQuery({
363+
queryKey,
299364
queryFn: ({ pageParam, signal }) =>
300365
fetchTableRows({
301366
workspaceId,
@@ -314,23 +379,79 @@ export function useInfiniteTableRows({
314379
},
315380
enabled: Boolean(workspaceId && tableId) && enabled,
316381
staleTime: 30 * 1000,
317-
/**
318-
* Poll while any row has a `pending` or `running` group execution.
319-
* Realtime sockets push every cell write, but cross-network paths
320-
* (trigger.dev workers → realtime ECS, client through CloudFront/proxy)
321-
* occasionally drop events. Polling at the running cadence is the
322-
* safety net so cells reach their terminal state without a refresh.
323-
* No polling when nothing is running and no polling while a mutation
324-
* is in flight (optimistic-update guard).
325-
*/
326-
refetchInterval: (query) => {
327-
if (queryClient.isMutating() > 0) return false
328-
return hasRunningGroupExecutionInPages(query.state.data?.pages)
329-
? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS
330-
: false
331-
},
332-
refetchIntervalInBackground: false,
333382
})
383+
384+
/**
385+
* Per-page polling. Built-in `refetchInterval` would refetch every loaded
386+
* page on each tick — wasteful when only one page has running cells.
387+
* Instead, walk pages each tick and refetch ONLY the dirty ones, splicing
388+
* results back into the cache. Polling stops when no page has in-flight
389+
* cells, or while a mutation is running (optimistic-update guard).
390+
*/
391+
useEffect(() => {
392+
if (!enabled || !workspaceId || !tableId) return
393+
let cancelled = false
394+
let timeoutId: ReturnType<typeof setTimeout> | null = null
395+
const tick = async () => {
396+
if (cancelled) return
397+
if (queryClient.isMutating() === 0) {
398+
const data = queryClient.getQueryData<InfiniteData<TableRowsResponse, number>>(queryKey)
399+
const dirty: number[] = []
400+
if (data) {
401+
for (let i = 0; i < data.pages.length; i++) {
402+
if (hasRunningGroupExecution(data.pages[i].rows)) {
403+
dirty.push(data.pageParams[i] ?? i * pageSize)
404+
}
405+
}
406+
}
407+
if (dirty.length > 0) {
408+
await Promise.all(
409+
dirty.map(async (offset) => {
410+
try {
411+
const fresh = await fetchTableRows({
412+
workspaceId,
413+
tableId,
414+
limit: pageSize,
415+
offset,
416+
filter,
417+
sort,
418+
includeTotal: offset === 0,
419+
})
420+
if (cancelled) return
421+
queryClient.setQueryData<InfiniteData<TableRowsResponse, number>>(
422+
queryKey,
423+
(prev) => {
424+
if (!prev) return prev
425+
const idx = prev.pageParams.indexOf(offset)
426+
if (idx === -1) return prev
427+
const merged = mergePagePreservingIdentity(prev.pages[idx], fresh)
428+
if (merged === prev.pages[idx]) return prev
429+
const nextPages = prev.pages.slice()
430+
nextPages[idx] = merged
431+
return { ...prev, pages: nextPages }
432+
}
433+
)
434+
} catch {
435+
// Transient fetch failure — next tick retries. Don't kill the loop.
436+
}
437+
})
438+
)
439+
}
440+
}
441+
if (cancelled) return
442+
// Recursive setTimeout instead of setInterval so a slow tick can't
443+
// overlap the next one — out-of-order responses would otherwise let
444+
// stale data overwrite fresh.
445+
timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS)
446+
}
447+
timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS)
448+
return () => {
449+
cancelled = true
450+
if (timeoutId !== null) clearTimeout(timeoutId)
451+
}
452+
}, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey])
453+
454+
return query
334455
}
335456

336457
/**
@@ -1176,6 +1297,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
11761297
onMutate: async ({ groupIds, runMode = 'all', rowIds }) => {
11771298
const targetRowIds = rowIds && rowIds.length > 0 ? new Set(rowIds) : null
11781299
const targetGroupIds = new Set(groupIds)
1300+
const groups =
1301+
queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))?.schema
1302+
.workflowGroups ?? []
1303+
const groupsById = new Map(groups.map((g) => [g.id, g]))
11791304
const snapshots = await snapshotAndMutateRows(queryClient, tableId, (r) => {
11801305
if (targetRowIds && !targetRowIds.has(r.id)) return null
11811306
const executions = r.executions ?? {}
@@ -1184,7 +1309,15 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
11841309
for (const groupId of targetGroupIds) {
11851310
const exec = executions[groupId] as RowExecutionMetadata | undefined
11861311
if (isOptimisticInFlight(exec)) continue
1187-
if (runMode === 'incomplete' && exec?.status === 'completed') continue
1312+
// Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
1313+
// outputs are filled, regardless of exec status. A cancelled/error
1314+
// cell with a leftover value from a prior run was rendering as filled
1315+
// but flipping to "queued" optimistically here even though the server
1316+
// would skip it.
1317+
if (runMode === 'incomplete') {
1318+
const group = groupsById.get(groupId)
1319+
if (group && areOutputsFilled(group, r)) continue
1320+
}
11881321
next[groupId] = buildPendingExec(exec)
11891322
changed = true
11901323
}

apps/sim/lib/api/contracts/tables.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,12 @@ export const runColumnContract = defineRouteContract({
895895
body: runColumnBodySchema,
896896
response: {
897897
mode: 'json',
898-
schema: successResponseSchema(z.object({ triggered: z.number() })),
898+
/**
899+
* `triggered` is `null` when the dispatcher runs in the background — the
900+
* actual count is only known after a fan-out that may be tens of thousands
901+
* of rows, and we don't hold the HTTP response open for that long.
902+
*/
903+
schema: successResponseSchema(z.object({ triggered: z.number().nullable() })),
899904
},
900905
})
901906

apps/sim/lib/copilot/tools/server/table/user-table.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,19 +1415,24 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
14151415
}
14161416
const requestId = generateId().slice(0, 8)
14171417
assertNotAborted()
1418-
const { triggered } = await runWorkflowColumn({
1418+
// Dispatch in the background — large fan-outs (thousands of rows)
1419+
// issue sequential trigger.dev calls and would otherwise hold the
1420+
// tool span open for minutes, blocking the chat connection.
1421+
void runWorkflowColumn({
14191422
tableId: args.tableId,
14201423
workspaceId,
14211424
groupIds,
14221425
mode: runMode,
14231426
rowIds,
14241427
requestId,
1428+
}).catch((err) => {
1429+
logger.error(`[${requestId}] run_column dispatch failed`, err)
14251430
})
14261431
const scopeLabel = rowIds ? `${rowIds.length} row(s) by id` : runMode
14271432
return {
14281433
success: true,
1429-
message: `Triggered ${triggered} row(s) across ${groupIds.length} column(s) (${scopeLabel})`,
1430-
data: { triggered },
1434+
message: `Started running ${groupIds.length} column(s) (${scopeLabel}). Cells will populate as workflows complete.`,
1435+
data: { triggered: null },
14311436
}
14321437
}
14331438

apps/sim/lib/core/config/env.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ export const env = createEnv({
455455
NEXT_PUBLIC_AUDIT_LOGS_ENABLED: z.boolean().optional(), // Enable audit logs on self-hosted (bypasses hosted requirements)
456456
NEXT_PUBLIC_DATA_RETENTION_ENABLED: z.boolean().optional(), // Enable data retention settings on self-hosted (bypasses hosted requirements)
457457
NEXT_PUBLIC_DATA_DRAINS_ENABLED: z.boolean().optional(), // Enable data drains on self-hosted (bypasses hosted requirements)
458+
NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED: z.boolean().optional(), // Show the "Workflow" column type in user tables (defaults to false)
458459
NEXT_PUBLIC_ORGANIZATIONS_ENABLED: z.boolean().optional(), // Enable organizations on self-hosted (bypasses plan requirements)
459460
NEXT_PUBLIC_DISABLE_INVITATIONS: z.boolean().optional(), // Disable workspace invitations globally (for self-hosted deployments)
460461
NEXT_PUBLIC_DISABLE_PUBLIC_API: z.boolean().optional(), // Disable public API access UI toggle globally
@@ -493,6 +494,7 @@ export const env = createEnv({
493494
NEXT_PUBLIC_AUDIT_LOGS_ENABLED: process.env.NEXT_PUBLIC_AUDIT_LOGS_ENABLED,
494495
NEXT_PUBLIC_DATA_RETENTION_ENABLED: process.env.NEXT_PUBLIC_DATA_RETENTION_ENABLED,
495496
NEXT_PUBLIC_DATA_DRAINS_ENABLED: process.env.NEXT_PUBLIC_DATA_DRAINS_ENABLED,
497+
NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED: process.env.NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED,
496498
NEXT_PUBLIC_ORGANIZATIONS_ENABLED: process.env.NEXT_PUBLIC_ORGANIZATIONS_ENABLED,
497499
NEXT_PUBLIC_DISABLE_INVITATIONS: process.env.NEXT_PUBLIC_DISABLE_INVITATIONS,
498500
NEXT_PUBLIC_DISABLE_PUBLIC_API: process.env.NEXT_PUBLIC_DISABLE_PUBLIC_API,

apps/sim/lib/core/config/feature-flags.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,15 @@ export const isDataRetentionEnabled = isTruthy(env.DATA_RETENTION_ENABLED)
141141
*/
142142
export const isDataDrainsEnabled = isTruthy(env.DATA_DRAINS_ENABLED)
143143

144+
/**
145+
* Are workflow output columns enabled in user tables.
146+
* Defaults to false; set NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED=true to show
147+
* the "Workflow" column type in the new-column dropdown.
148+
*/
149+
export const isWorkflowColumnsEnabledClient = isTruthy(
150+
getEnv('NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED')
151+
)
152+
144153
/**
145154
* Is E2B enabled for remote code execution
146155
*/

0 commit comments

Comments
 (0)