Skip to content

Commit 64197b5

Browse files
chore(table): simplify SSE plumbing — reuse helpers, drop dead polling code
- Reuse snapshotAndMutateRows for SSE cache patches instead of reimplementing the page-walk + cache-shape detection. Adds a {cancelInFlight: false} opt for the SSE caller (mutations still cancel as before). - Drop client-side type duplication in use-table-event-stream — import TableEvent and TableEventEntry from lib/table/events directly. - Drop the now-dead mergePagePreservingIdentity + rowEqual from tables.ts; their only caller was the polling effect that was removed earlier. - Drop the defensive try/catch around appendTableEvent in cell-write — the function is documented as never-throwing (returns null on failure). - Combine INCR + ZADD into one Lua eval in events.ts. Halves Redis RTT per cell-write. Lua returns the new eventId; the script splices it into the pre-built entry JSON. - Trim refs to plain let bindings inside the effect; trim stale comments referencing the old polling implementation.
1 parent 56ac40c commit 64197b5

5 files changed

Lines changed: 96 additions & 287 deletions

File tree

Lines changed: 48 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,14 @@
11
'use client'
22

3-
import { useEffect, useRef } from 'react'
3+
import { useEffect } from 'react'
44
import { createLogger } from '@sim/logger'
55
import { useQueryClient } from '@tanstack/react-query'
6-
import { tableKeys, type TableRowsResponse } from '@/hooks/queries/tables'
7-
import type { RowData, RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table'
6+
import { snapshotAndMutateRows, tableKeys } from '@/hooks/queries/tables'
7+
import type { TableEvent, TableEventEntry } from '@/lib/table/events'
8+
import type { RowData, RowExecutionMetadata, RowExecutions } from '@/lib/table'
89

910
const logger = createLogger('useTableEventStream')
1011

11-
/** Mirrors the server-side `TableCellStatus` from `apps/sim/lib/table/events.ts`. */
12-
type TableCellStatus = 'pending' | 'queued' | 'running' | 'completed' | 'cancelled' | 'error'
13-
14-
interface TableCellEvent {
15-
kind: 'cell'
16-
tableId: string
17-
rowId: string
18-
groupId: string
19-
status: TableCellStatus
20-
executionId: string | null
21-
jobId: string | null
22-
error: string | null
23-
outputs?: Record<string, unknown>
24-
}
25-
26-
interface TableEventEntry {
27-
eventId: number
28-
tableId: string
29-
event: TableCellEvent
30-
}
31-
3212
interface PrunedEvent {
3313
earliestEventId: number | null
3414
}
@@ -43,17 +23,12 @@ interface UseTableEventStreamArgs {
4323

4424
/**
4525
* Subscribes to the table's SSE event stream and patches the React Query
46-
* cache as cell-state events arrive. Replaces polling — once the page mounts,
47-
* cells flip in <100ms via push instead of waiting for the next poll tick.
48-
*
49-
* Reconnect-resume: on transport error, the hook reconnects with `from=` set
50-
* to the last seen `eventId`; the server replays anything missed from the
51-
* Redis-backed buffer. If the buffer has rolled past the gap (server returns
52-
* a `pruned` event), the hook full-refetches the row queries and resumes
53-
* streaming from the new earliest.
26+
* cache as cell-state events arrive.
5427
*
55-
* Returns nothing — the only side effect is keeping the cache live. Cleans
56-
* up the EventSource on unmount or argument change.
28+
* Reconnect-resume: on transport error, reconnects with `from=` set to the
29+
* last seen `eventId`; server replays missed events from the Redis-backed
30+
* buffer. If the gap exceeds buffer retention (server emits `pruned`), the
31+
* hook full-refetches the row queries and resumes from the new earliest.
5732
*/
5833
export function useTableEventStream({
5934
tableId,
@@ -62,64 +37,55 @@ export function useTableEventStream({
6237
}: UseTableEventStreamArgs): void {
6338
const queryClient = useQueryClient()
6439

65-
// Refs so the long-lived stream loop reads current values without forcing
66-
// effect re-subscription on every render.
67-
const lastEventIdRef = useRef(0)
68-
const reconnectAttemptRef = useRef(0)
69-
7040
useEffect(() => {
7141
if (!enabled || !tableId || !workspaceId) return
7242

7343
let cancelled = false
7444
let eventSource: EventSource | null = null
7545
let reconnectTimer: ReturnType<typeof setTimeout> | null = null
76-
// Reset the dedupe cursor on every fresh mount so a remount after
77-
// navigation doesn't accidentally skip events from a prior session.
78-
lastEventIdRef.current = 0
79-
reconnectAttemptRef.current = 0
80-
81-
const patchRow = (entry: TableEventEntry): void => {
82-
const { rowId, groupId, status, executionId, jobId, error, outputs } = entry.event
83-
const nextExec: RowExecutionMetadata = {
84-
status,
85-
executionId: executionId ?? null,
86-
jobId: jobId ?? null,
87-
// workflowId is required by the type but not in the SSE payload — we
88-
// preserve any prior value via the merge below; if there's no prior
89-
// value, the empty string is overwritten on the next refetch.
90-
workflowId: '',
91-
error: error ?? null,
92-
}
93-
94-
const queries = queryClient.getQueriesData<unknown>({
95-
queryKey: tableKeys.rowsRoot(tableId),
96-
})
97-
for (const [queryKey, data] of queries) {
98-
if (!data) continue
99-
const patched = patchCacheEntry(data, rowId, groupId, nextExec, outputs)
100-
if (patched !== data) {
101-
queryClient.setQueryData(queryKey, patched)
102-
}
103-
}
46+
let lastEventId = 0
47+
let reconnectAttempt = 0
48+
49+
const applyCell = (event: Extract<TableEvent, { kind: 'cell' }>): void => {
50+
const { rowId, groupId, status, executionId, jobId, error, outputs } = event
51+
void snapshotAndMutateRows(
52+
queryClient,
53+
tableId,
54+
(row) => {
55+
if (row.id !== rowId) return null
56+
const prevExec = row.executions?.[groupId]
57+
const nextExec: RowExecutionMetadata = {
58+
status,
59+
executionId: executionId ?? null,
60+
jobId: jobId ?? null,
61+
// Preserve workflowId from cache; SSE payload doesn't carry it.
62+
workflowId: prevExec?.workflowId ?? '',
63+
error: error ?? null,
64+
}
65+
const nextExecutions: RowExecutions = {
66+
...(row.executions ?? {}),
67+
[groupId]: nextExec,
68+
}
69+
const nextData: RowData = outputs
70+
? ({ ...row.data, ...outputs } as RowData)
71+
: row.data
72+
return { ...row, executions: nextExecutions, data: nextData }
73+
},
74+
{ cancelInFlight: false }
75+
)
10476
}
10577

10678
const handlePrune = (payload: PrunedEvent): void => {
10779
logger.info('Table event buffer pruned — full refetch', { tableId, ...payload })
10880
void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) })
109-
// Resume streaming from the new earliest. The next reconnect picks
110-
// this up via lastEventIdRef.current.
111-
if (typeof payload.earliestEventId === 'number') {
112-
lastEventIdRef.current = payload.earliestEventId
113-
} else {
114-
lastEventIdRef.current = 0
115-
}
81+
lastEventId = typeof payload.earliestEventId === 'number' ? payload.earliestEventId : 0
11682
}
11783

11884
const scheduleReconnect = (): void => {
11985
if (cancelled) return
120-
const attempt = Math.min(reconnectAttemptRef.current, RECONNECT_BACKOFF_MS.length - 1)
121-
const delay = RECONNECT_BACKOFF_MS[attempt]
122-
reconnectAttemptRef.current++
86+
const idx = Math.min(reconnectAttempt, RECONNECT_BACKOFF_MS.length - 1)
87+
const delay = RECONNECT_BACKOFF_MS[idx]
88+
reconnectAttempt++
12389
reconnectTimer = setTimeout(() => {
12490
reconnectTimer = null
12591
connect()
@@ -128,7 +94,7 @@ export function useTableEventStream({
12894

12995
const connect = (): void => {
13096
if (cancelled) return
131-
const url = `/api/table/${tableId}/events/stream?from=${lastEventIdRef.current}`
97+
const url = `/api/table/${tableId}/events/stream?from=${lastEventId}`
13298
try {
13399
eventSource = new EventSource(url)
134100
} catch (err) {
@@ -138,16 +104,16 @@ export function useTableEventStream({
138104
}
139105

140106
eventSource.onopen = () => {
141-
reconnectAttemptRef.current = 0
107+
reconnectAttempt = 0
142108
}
143109

144110
eventSource.onmessage = (msg: MessageEvent<string>) => {
145111
try {
146112
const entry = JSON.parse(msg.data) as TableEventEntry
147113
if (entry.event?.kind !== 'cell') return
148-
if (entry.eventId <= lastEventIdRef.current) return
149-
lastEventIdRef.current = entry.eventId
150-
patchRow(entry)
114+
if (entry.eventId <= lastEventId) return
115+
lastEventId = entry.eventId
116+
applyCell(entry.event)
151117
} catch (err) {
152118
logger.warn('Failed to parse table event', { tableId, err })
153119
}
@@ -162,7 +128,6 @@ export function useTableEventStream({
162128
})
163129

164130
eventSource.addEventListener('rotate', () => {
165-
// Server hit its defensive duration ceiling — close + reconnect.
166131
eventSource?.close()
167132
eventSource = null
168133
scheduleReconnect()
@@ -186,84 +151,3 @@ export function useTableEventStream({
186151
}
187152
}, [enabled, tableId, workspaceId, queryClient])
188153
}
189-
190-
/**
191-
* Returns a new cache entry with the given row's executions/data patched, or
192-
* the original reference if the row isn't in this entry. Handles both
193-
* single-page (`useTableRows`) and infinite (`useInfiniteTableRows`) shapes.
194-
*
195-
* Within a page we only allocate a new row object when it actually changes;
196-
* unchanged rows keep their reference so memoized `<DataRow>` short-circuits.
197-
*/
198-
function patchCacheEntry(
199-
entry: unknown,
200-
rowId: string,
201-
groupId: string,
202-
nextExec: RowExecutionMetadata,
203-
outputs: Record<string, unknown> | undefined
204-
): unknown {
205-
if (isInfiniteCache(entry)) {
206-
let touched = false
207-
const nextPages = entry.pages.map((page) => {
208-
const nextRows = patchRows(page.rows, rowId, groupId, nextExec, outputs)
209-
if (nextRows === page.rows) return page
210-
touched = true
211-
return { ...page, rows: nextRows }
212-
})
213-
if (!touched) return entry
214-
return { ...entry, pages: nextPages }
215-
}
216-
if (isSinglePage(entry)) {
217-
const nextRows = patchRows(entry.rows, rowId, groupId, nextExec, outputs)
218-
if (nextRows === entry.rows) return entry
219-
return { ...entry, rows: nextRows }
220-
}
221-
return entry
222-
}
223-
224-
function patchRows(
225-
rows: TableRow[],
226-
rowId: string,
227-
groupId: string,
228-
nextExec: RowExecutionMetadata,
229-
outputs: Record<string, unknown> | undefined
230-
): TableRow[] {
231-
let touched = false
232-
const next = rows.map((row) => {
233-
if (row.id !== rowId) return row
234-
const prevExec = row.executions?.[groupId]
235-
// Preserve the prior workflowId — the SSE payload doesn't carry it but
236-
// the cache row may already have it from the page query.
237-
const mergedExec: RowExecutionMetadata = {
238-
...nextExec,
239-
workflowId: prevExec?.workflowId ?? nextExec.workflowId,
240-
}
241-
const nextExecutions: RowExecutions = { ...(row.executions ?? {}), [groupId]: mergedExec }
242-
const nextData: RowData = outputs
243-
? ({ ...row.data, ...outputs } as RowData)
244-
: row.data
245-
touched = true
246-
return { ...row, executions: nextExecutions, data: nextData }
247-
})
248-
return touched ? next : rows
249-
}
250-
251-
interface InfiniteCache {
252-
pages: TableRowsResponse[]
253-
pageParams: number[]
254-
}
255-
256-
function isInfiniteCache(value: unknown): value is InfiniteCache {
257-
return (
258-
typeof value === 'object' &&
259-
value !== null &&
260-
Array.isArray((value as InfiniteCache).pages) &&
261-
Array.isArray((value as InfiniteCache).pageParams)
262-
)
263-
}
264-
265-
function isSinglePage(value: unknown): value is TableRowsResponse {
266-
return (
267-
typeof value === 'object' && value !== null && Array.isArray((value as TableRowsResponse).rows)
268-
)
269-
}

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ export function Table({
117117
const workspaceId = propWorkspaceId || (params.workspaceId as string)
118118
const tableId = propTableId || (params.tableId as string)
119119

120-
// Subscribe to per-cell SSE events for this table. Patches the row cache
121-
// as transitions arrive — replaces polling for live updates.
122120
useTableEventStream({ tableId, workspaceId })
123121

124122
const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' })

0 commit comments

Comments
 (0)