Skip to content

Commit af84a8a

Browse files
fix(table): address PR review on SSE buffer
- TTL-expiry silent miss: when all keys expire, hgetall(meta) returns empty so earliestEventId is undefined and the prune branch was skipped. Reconnect with non-zero afterEventId now checks the seq counter — its absence (TTL expired) signals pruned so the client refetches. Memory fallback mirrors. - Unbounded ZRANGEBYSCORE: cap reads at TABLE_EVENT_READ_CHUNK = 500 events per call. The route's 500ms poll loop drains chunks across ticks instead of flushing 5000 entries (multi-MB) in one tick after a long disconnect. - Pruned handler closes EventSource client-side: server-side close was firing onerror and routing through the 500ms backoff path. Now we close proactively, reset the reconnect attempt counter, and reconnect immediately from the new earliest.
1 parent 64197b5 commit af84a8a

2 files changed

Lines changed: 38 additions & 3 deletions

File tree

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ export function useTableEventStream({
7979
logger.info('Table event buffer pruned — full refetch', { tableId, ...payload })
8080
void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) })
8181
lastEventId = typeof payload.earliestEventId === 'number' ? payload.earliestEventId : 0
82+
// Close proactively so the server's close doesn't fire onerror and route
83+
// through the backoff path. Reconnect immediately from the new cursor.
84+
eventSource?.close()
85+
eventSource = null
86+
reconnectAttempt = 0
87+
connect()
8288
}
8389

8490
const scheduleReconnect = (): void => {

apps/sim/lib/table/events.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const logger = createLogger('TableEventBuffer')
2424
const REDIS_PREFIX = 'table:stream:'
2525
export const TABLE_EVENT_TTL_SECONDS = 60 * 60 // 1 hour
2626
export const TABLE_EVENT_CAP = 5000
27+
/** Max events returned by a single read; the SSE route drains in chunks. */
28+
export const TABLE_EVENT_READ_CHUNK = 500
2729

2830
/**
2931
* Atomic append: INCR the seq counter to mint a new eventId, build the entry
@@ -155,13 +157,21 @@ function appendMemory(event: TableEvent): TableEventEntry {
155157
function readMemory(tableId: string, afterEventId: number): TableEventsReadResult {
156158
pruneExpiredMemoryStreams()
157159
const stream = memoryTableStreams.get(tableId)
158-
if (!stream) return { status: 'ok', events: [] }
160+
if (!stream) {
161+
// Mirror the Redis path: a non-zero afterEventId with no buffer at all
162+
// means TTL expired or the stream never existed; either way the caller's
163+
// cursor is stale.
164+
if (afterEventId > 0) return { status: 'pruned', earliestEventId: undefined }
165+
return { status: 'ok', events: [] }
166+
}
159167
if (stream.earliestEventId !== undefined && afterEventId + 1 < stream.earliestEventId) {
160168
return { status: 'pruned', earliestEventId: stream.earliestEventId }
161169
}
162170
return {
163171
status: 'ok',
164-
events: stream.events.filter((entry) => entry.eventId > afterEventId),
172+
events: stream.events
173+
.filter((entry) => entry.eventId > afterEventId)
174+
.slice(0, TABLE_EVENT_READ_CHUNK),
165175
}
166176
}
167177

@@ -239,7 +249,26 @@ export async function readTableEventsSince(
239249
if (earliestEventId !== undefined && afterEventId + 1 < earliestEventId) {
240250
return { status: 'pruned', earliestEventId }
241251
}
242-
const raw = await redis.zrangebyscore(getEventsKey(tableId), afterEventId + 1, '+inf')
252+
// Read in capped chunks so a 5000-event backlog doesn't materialize as one
253+
// multi-MB Redis reply + JSON parse + SSE flush. The route loop drains
254+
// chunks across ticks.
255+
const raw = await redis.zrangebyscore(
256+
getEventsKey(tableId),
257+
afterEventId + 1,
258+
'+inf',
259+
'LIMIT',
260+
0,
261+
TABLE_EVENT_READ_CHUNK
262+
)
263+
if (raw.length === 0 && afterEventId > 0) {
264+
// Total TTL expiry: events + meta both gone. The seq counter has the
265+
// same TTL — its absence means the buffer was wiped and the caller's
266+
// `afterEventId` is stale. Signal pruned so the client refetches.
267+
const seqExists = await redis.exists(getSeqKey(tableId))
268+
if (seqExists === 0) {
269+
return { status: 'pruned', earliestEventId: undefined }
270+
}
271+
}
243272
return {
244273
status: 'ok',
245274
events: raw

0 commit comments

Comments
 (0)