Skip to content

Commit 6ad5523

Browse files
committed
improvement(tables): remove polling, eager drain, and parallelize batch updates
- Drop the per-page polling loop — SSE stream already patches execution cell state in real time and invalidates on buffer prune; polling was redundant and burned CPU/network on every open table - Remove eager mount drain (fetchNextPage loop in use-table.ts); scroll handler and ensureAllRowsLoaded handle progressive/on-demand loading - Parallelize chunkBatchUpdates with a 3-worker pool instead of serial chunks, reducing bulk-op round-trips by ~3x - Delete mergePagePreservingIdentity and its tests (no longer called)
1 parent a89ee35 commit 6ad5523

4 files changed

Lines changed: 15 additions & 199 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,16 +198,27 @@ interface TableGridProps {
198198
* an all-or-nothing result and partial success cannot leave the table in an
199199
* ambiguous half-cleared state.
200200
*/
201+
const CHUNK_CONCURRENCY = 3
202+
201203
async function chunkBatchUpdates(
202204
updates: Array<{ rowId: string; data: Record<string, unknown> }>,
203205
mutateAsync: (args: {
204206
updates: Array<{ rowId: string; data: Record<string, unknown> }>
205207
}) => Promise<unknown>
206208
): Promise<void> {
207209
const size = TABLE_LIMITS.MAX_BULK_OPERATION_SIZE
210+
const chunks: Array<Array<{ rowId: string; data: Record<string, unknown> }>> = []
208211
for (let i = 0; i < updates.length; i += size) {
209-
await mutateAsync({ updates: updates.slice(i, i + size) })
212+
chunks.push(updates.slice(i, i + size))
210213
}
214+
let cursor = 0
215+
await Promise.all(
216+
Array.from({ length: Math.min(CHUNK_CONCURRENCY, chunks.length) }, async () => {
217+
while (cursor < chunks.length) {
218+
await mutateAsync({ updates: chunks[cursor++]! })
219+
}
220+
})
221+
)
211222
}
212223

213224
export function TableGrid({

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use client'
22

3-
import { useCallback, useEffect, useMemo } from 'react'
3+
import { useCallback, useMemo } from 'react'
44
import { useQueryClient } from '@tanstack/react-query'
55
import type { ColumnDefinition, TableDefinition, TableRow, WorkflowGroup } from '@/lib/table'
66
import { TABLE_LIMITS } from '@/lib/table/constants'
@@ -84,13 +84,6 @@ export function useTable({ workspaceId, tableId, queryOptions }: UseTableParams)
8484
enabled: Boolean(workspaceId && tableId),
8585
})
8686

87-
// prefetchInfiniteQuery is a no-op when data is fresh (staleTime not exceeded),
88-
// so drive the drain through fetchNextPage — it appends one page at a time.
89-
useEffect(() => {
90-
if (!workspaceId || !tableId || !hasNextPage || isFetchingNextPage) return
91-
void fetchNextPage()
92-
}, [workspaceId, tableId, hasNextPage, isFetchingNextPage, fetchNextPage])
93-
9487
const rows = useMemo<TableRow[]>(
9588
() => rowsData?.pages.flatMap((p) => p.rows) ?? [],
9689
[rowsData?.pages]

apps/sim/hooks/queries/tables.test.ts

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ vi.mock('@/components/emcn', () => ({
8282
}))
8383

8484
import {
85-
mergePagePreservingIdentity,
8685
tableKeys,
8786
tableRowsInfiniteOptions,
8887
tableRowsParamsKey,
@@ -371,75 +370,3 @@ describe('tableRowsInfiniteOptions', () => {
371370
expect(JSON.stringify(opts1.queryKey)).not.toBe(JSON.stringify(opts2.queryKey))
372371
})
373372
})
374-
375-
describe('mergePagePreservingIdentity', () => {
376-
const ts = '2024-01-01T00:00:00.000Z'
377-
const ts2 = '2024-01-02T00:00:00.000Z'
378-
379-
function makeRow(id: string, updatedAt: string, extra?: Record<string, unknown>) {
380-
return { id, updatedAt, data: { value: id, ...extra } }
381-
}
382-
383-
it('returns fresh when totalCount differs', () => {
384-
const prev = { rows: [makeRow('r1', ts)], totalCount: 1, nextOffset: undefined }
385-
const fresh = { rows: [makeRow('r1', ts)], totalCount: 2, nextOffset: undefined }
386-
const result = mergePagePreservingIdentity(prev, fresh)
387-
expect(result).toBe(fresh)
388-
})
389-
390-
it('returns fresh when row counts differ', () => {
391-
const prev = { rows: [makeRow('r1', ts)], totalCount: 2, nextOffset: undefined }
392-
const fresh = {
393-
rows: [makeRow('r1', ts), makeRow('r2', ts)],
394-
totalCount: 2,
395-
nextOffset: undefined,
396-
}
397-
const result = mergePagePreservingIdentity(prev, fresh)
398-
expect(result).toBe(fresh)
399-
})
400-
401-
it('returns prev (same reference) when all rows are unchanged', () => {
402-
const row1 = makeRow('r1', ts)
403-
const row2 = makeRow('r2', ts)
404-
const prev = { rows: [row1, row2], totalCount: 2, nextOffset: undefined }
405-
const freshRow1 = makeRow('r1', ts)
406-
const fresh = { rows: [freshRow1, makeRow('r2', ts)], totalCount: 2, nextOffset: undefined }
407-
const result = mergePagePreservingIdentity(prev, fresh)
408-
expect(result).toBe(prev)
409-
})
410-
411-
it('preserves identity for unchanged rows, uses fresh for updated rows', () => {
412-
const row1 = makeRow('r1', ts)
413-
const row2 = makeRow('r2', ts)
414-
const prev = { rows: [row1, row2], totalCount: 2, nextOffset: undefined }
415-
const updatedRow2 = makeRow('r2', ts2, { extra: 'new' })
416-
const fresh = { rows: [makeRow('r1', ts), updatedRow2], totalCount: 2, nextOffset: undefined }
417-
const result = mergePagePreservingIdentity(prev, fresh)
418-
expect(result).not.toBe(prev)
419-
expect(result.rows[0]).toBe(row1)
420-
expect(result.rows[1]).toBe(updatedRow2)
421-
})
422-
423-
it('uses fresh row when ID is not found in prev', () => {
424-
const row1 = makeRow('r1', ts)
425-
const prev = { rows: [row1, makeRow('r2', ts)], totalCount: 2, nextOffset: undefined }
426-
const newRow = makeRow('r3', ts)
427-
const fresh = { rows: [makeRow('r1', ts), newRow], totalCount: 2, nextOffset: undefined }
428-
const result = mergePagePreservingIdentity(prev, fresh)
429-
expect(result.rows[1]).toBe(newRow)
430-
})
431-
432-
it('compares updatedAt as dates, not strings (ISO vs different string forms)', () => {
433-
const row1 = makeRow('r1', ts)
434-
const prev = { rows: [row1], totalCount: 1, nextOffset: undefined }
435-
// Same point in time, different ISO string representation (with trailing Z vs +00:00)
436-
const sameTimeDifferentFormat = '2024-01-01T00:00:00+00:00'
437-
const fresh = {
438-
rows: [makeRow('r1', sameTimeDifferentFormat)],
439-
totalCount: 1,
440-
nextOffset: undefined,
441-
}
442-
const result = mergePagePreservingIdentity(prev, fresh)
443-
expect(result).toBe(prev)
444-
})
445-
})

apps/sim/hooks/queries/tables.ts

Lines changed: 2 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
* React Query hooks for managing user-defined tables.
55
*/
66

7-
import { useEffect, useMemo } from 'react'
87
import { createLogger } from '@sim/logger'
98
import {
109
type InfiniteData,
@@ -70,17 +69,10 @@ import type {
7069
WorkflowGroupDependencies,
7170
WorkflowGroupOutput,
7271
} from '@/lib/table'
73-
import {
74-
areOutputsFilled,
75-
hasRunningGroupExecution,
76-
optimisticallyScheduleNewlyEligibleGroups,
77-
} from '@/lib/table/deps'
72+
import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'
7873

7974
const logger = createLogger('TableQueries')
8075

81-
const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 2_000
82-
const ROWS_POLL_INTERVAL_IDLE_MS = 30_000
83-
8476
type TableQueryScope = 'active' | 'archived' | 'all'
8577

8678
export const tableKeys = {
@@ -251,24 +243,6 @@ export function useTableRows({
251243
})
252244
}
253245

254-
/** @internal — exported for testing only. */
255-
export function mergePagePreservingIdentity(
256-
prev: TableRowsResponse,
257-
fresh: TableRowsResponse
258-
): TableRowsResponse {
259-
if (prev.totalCount !== fresh.totalCount || prev.rows.length !== fresh.rows.length) return fresh
260-
const prevById = new Map(prev.rows.map((r) => [r.id, r]))
261-
let allSame = true
262-
const nextRows = fresh.rows.map((freshRow) => {
263-
const prevRow = prevById.get(freshRow.id)
264-
if (prevRow && new Date(prevRow.updatedAt).getTime() === new Date(freshRow.updatedAt).getTime())
265-
return prevRow
266-
allSame = false
267-
return freshRow
268-
})
269-
return allSame ? prev : { ...fresh, rows: nextRows }
270-
}
271-
272246
export function tableRowsParamsKey({
273247
pageSize,
274248
filter,
@@ -316,99 +290,10 @@ export function useInfiniteTableRows({
316290
sort,
317291
enabled = true,
318292
}: InfiniteTableRowsParams) {
319-
const queryClient = useQueryClient()
320-
const paramsKey = tableRowsParamsKey({ pageSize, filter, sort })
321-
// Memoize the key so the polling useEffect below doesn't fire on every render.
322-
const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey])
323-
324-
const query = useInfiniteQuery({
293+
return useInfiniteQuery({
325294
...tableRowsInfiniteOptions({ workspaceId, tableId, pageSize, filter, sort }),
326-
queryKey,
327295
enabled: Boolean(workspaceId && tableId) && enabled,
328296
})
329-
330-
/**
331-
* Per-page polling. Built-in `refetchInterval` would refetch every loaded
332-
* page on each tick — wasteful when only one page has running cells.
333-
* Instead, walk pages each tick and refetch ONLY the dirty ones, splicing
334-
* results back into the cache. Polling stops when no page has in-flight
335-
* cells, or while a mutation is running (optimistic-update guard).
336-
*/
337-
useEffect(() => {
338-
if (!enabled || !workspaceId || !tableId) return
339-
let cancelled = false
340-
let timeoutId: ReturnType<typeof setTimeout> | null = null
341-
const tick = async () => {
342-
if (cancelled) return
343-
let hasDirty = false
344-
if (queryClient.isMutating() !== 0) {
345-
// Mutation in progress — skip network fetch to avoid racing optimistic
346-
// updates, but stay on the short interval so we catch up quickly once
347-
// the mutation settles.
348-
hasDirty = true
349-
} else {
350-
const data = queryClient.getQueryData<InfiniteData<TableRowsResponse, number>>(queryKey)
351-
const dirty: number[] = []
352-
if (data) {
353-
for (let i = 0; i < data.pages.length; i++) {
354-
if (hasRunningGroupExecution(data.pages[i].rows)) {
355-
dirty.push(data.pageParams[i] ?? i * pageSize)
356-
}
357-
}
358-
}
359-
hasDirty = dirty.length > 0
360-
if (hasDirty) {
361-
await Promise.all(
362-
dirty.map(async (offset) => {
363-
try {
364-
const fresh = await fetchTableRows({
365-
workspaceId,
366-
tableId,
367-
limit: pageSize,
368-
offset,
369-
filter,
370-
sort,
371-
includeTotal: offset === 0,
372-
})
373-
if (cancelled) return
374-
queryClient.setQueryData<InfiniteData<TableRowsResponse, number>>(
375-
queryKey,
376-
(prev) => {
377-
if (!prev) return prev
378-
const idx = prev.pageParams.indexOf(offset)
379-
if (idx === -1) return prev
380-
const merged = mergePagePreservingIdentity(prev.pages[idx], fresh)
381-
if (merged === prev.pages[idx]) return prev
382-
const nextPages = prev.pages.slice()
383-
nextPages[idx] = merged
384-
return { ...prev, pages: nextPages }
385-
}
386-
)
387-
} catch {
388-
// Transient fetch failure — next tick retries. Don't kill the loop.
389-
}
390-
})
391-
)
392-
}
393-
}
394-
if (cancelled) return
395-
// Recursive setTimeout instead of setInterval so a slow tick can't
396-
// overlap the next one — out-of-order responses would otherwise let
397-
// stale data overwrite fresh. Use a long interval when idle so tables
398-
// with no running executions don't burn CPU on constant cache reads.
399-
timeoutId = setTimeout(
400-
() => void tick(),
401-
hasDirty ? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS : ROWS_POLL_INTERVAL_IDLE_MS
402-
)
403-
}
404-
timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS)
405-
return () => {
406-
cancelled = true
407-
if (timeoutId !== null) clearTimeout(timeoutId)
408-
}
409-
}, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey])
410-
411-
return query
412297
}
413298

414299
/**

0 commit comments

Comments
 (0)