Skip to content

Commit 0cb6714

Browse files
fix(rooms): cleanup edge case for 1hr ttl (#3163)
* fix(rooms): cleanup edge case for 1hr ttl * revert feature flags * address comments * remove console log
1 parent 7b36f92 commit 0cb6714

File tree

6 files changed

+101
-26
lines changed

6 files changed

+101
-26
lines changed

apps/sim/serializer/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ function shouldSerializeSubBlock(
7070
: group.basicId === subBlockConfig.id
7171
return matchesMode && evaluateSubBlockCondition(subBlockConfig.condition, values)
7272
}
73-
console.log('[FUCK] subBlockConfig.condition', subBlockConfig.condition, values)
7473
return evaluateSubBlockCondition(subBlockConfig.condition, values)
7574
}
7675

apps/sim/socket/handlers/connection.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager
2121
cleanupPendingSubblocksForSocket(socket.id)
2222
cleanupPendingVariablesForSocket(socket.id)
2323

24-
const workflowId = await roomManager.removeUserFromRoom(socket.id)
24+
const workflowIdHint = [...socket.rooms].find((roomId) => roomId !== socket.id)
25+
const workflowId = await roomManager.removeUserFromRoom(socket.id, workflowIdHint)
2526

2627
if (workflowId) {
2728
await roomManager.broadcastPresenceUpdate(workflowId)

apps/sim/socket/handlers/workflow.ts

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,66 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
5151
const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
5252
if (currentWorkflowId) {
5353
socket.leave(currentWorkflowId)
54-
await roomManager.removeUserFromRoom(socket.id)
54+
await roomManager.removeUserFromRoom(socket.id, currentWorkflowId)
5555
await roomManager.broadcastPresenceUpdate(currentWorkflowId)
5656
}
5757

58-
const STALE_THRESHOLD_MS = 60_000
58+
// Keep this above Redis socket key TTL (1h) so a normal idle user is not evicted too aggressively.
59+
const STALE_THRESHOLD_MS = 75 * 60 * 1000
5960
const now = Date.now()
6061
const existingUsers = await roomManager.getWorkflowUsers(workflowId)
62+
let liveSocketIds = new Set<string>()
63+
let canCheckLiveness = false
64+
65+
try {
66+
const liveSockets = await roomManager.io.in(workflowId).fetchSockets()
67+
liveSocketIds = new Set(liveSockets.map((liveSocket) => liveSocket.id))
68+
canCheckLiveness = true
69+
} catch (error) {
70+
logger.warn(
71+
`Skipping stale cleanup for ${workflowId} due to live socket lookup failure`,
72+
error
73+
)
74+
}
75+
6176
for (const existingUser of existingUsers) {
62-
if (existingUser.userId === userId && existingUser.socketId !== socket.id) {
63-
const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId
64-
const isStale =
65-
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
77+
try {
78+
if (existingUser.socketId === socket.id) {
79+
continue
80+
}
6681

67-
if (isSameTab || isStale) {
82+
const isSameTab = Boolean(
83+
existingUser.userId === userId &&
84+
tabSessionId &&
85+
existingUser.tabSessionId === tabSessionId
86+
)
87+
88+
if (isSameTab) {
6889
logger.info(
69-
`Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})`
90+
`Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (same tab)`
7091
)
71-
await roomManager.removeUserFromRoom(existingUser.socketId)
72-
roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
92+
await roomManager.removeUserFromRoom(existingUser.socketId, workflowId)
93+
await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
94+
continue
95+
}
96+
97+
if (!canCheckLiveness || liveSocketIds.has(existingUser.socketId)) {
98+
continue
99+
}
100+
101+
const isStaleByActivity =
102+
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
103+
if (!isStaleByActivity) {
104+
continue
73105
}
106+
107+
logger.info(
108+
`Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (stale activity)`
109+
)
110+
await roomManager.removeUserFromRoom(existingUser.socketId, workflowId)
111+
await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
112+
} catch (error) {
113+
logger.warn(`Best-effort cleanup failed for socket ${existingUser.socketId}`, error)
74114
}
75115
}
76116

@@ -136,7 +176,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
136176
logger.error('Error joining workflow:', error)
137177
// Undo socket.join and room manager entry if any operation failed
138178
socket.leave(workflowId)
139-
await roomManager.removeUserFromRoom(socket.id)
179+
await roomManager.removeUserFromRoom(socket.id, workflowId)
140180
const isReady = roomManager.isReady()
141181
socket.emit('join-workflow-error', {
142182
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
@@ -156,7 +196,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
156196

157197
if (workflowId && session) {
158198
socket.leave(workflowId)
159-
await roomManager.removeUserFromRoom(socket.id)
199+
await roomManager.removeUserFromRoom(socket.id, workflowId)
160200
await roomManager.broadcastPresenceUpdate(workflowId)
161201

162202
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)

apps/sim/socket/rooms/memory-manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export class MemoryRoomManager implements IRoomManager {
6666
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
6767
}
6868

69-
async removeUserFromRoom(socketId: string): Promise<string | null> {
69+
async removeUserFromRoom(socketId: string, _workflowIdHint?: string): Promise<string | null> {
7070
const workflowId = this.socketToWorkflow.get(socketId)
7171

7272
if (!workflowId) {

apps/sim/socket/rooms/redis-manager.ts

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ const KEYS = {
1010
workflowMeta: (wfId: string) => `workflow:${wfId}:meta`,
1111
socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`,
1212
socketSession: (socketId: string) => `socket:${socketId}:session`,
13+
socketPresenceWorkflow: (socketId: string) => `socket:${socketId}:presence-workflow`,
1314
} as const
1415

1516
const SOCKET_KEY_TTL = 3600
17+
const SOCKET_PRESENCE_WORKFLOW_KEY_TTL = 24 * 60 * 60
1618

1719
/**
1820
* Lua script for atomic user removal from room.
@@ -22,11 +24,21 @@ const SOCKET_KEY_TTL = 3600
2224
const REMOVE_USER_SCRIPT = `
2325
local socketWorkflowKey = KEYS[1]
2426
local socketSessionKey = KEYS[2]
27+
local socketPresenceWorkflowKey = KEYS[3]
2528
local workflowUsersPrefix = ARGV[1]
2629
local workflowMetaPrefix = ARGV[2]
2730
local socketId = ARGV[3]
31+
local workflowIdHint = ARGV[4]
2832
2933
local workflowId = redis.call('GET', socketWorkflowKey)
34+
if not workflowId then
35+
workflowId = redis.call('GET', socketPresenceWorkflowKey)
36+
end
37+
38+
if not workflowId and workflowIdHint ~= '' then
39+
workflowId = workflowIdHint
40+
end
41+
3042
if not workflowId then
3143
return nil
3244
end
@@ -35,7 +47,7 @@ local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users'
3547
local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta'
3648
3749
redis.call('HDEL', workflowUsersKey, socketId)
38-
redis.call('DEL', socketWorkflowKey, socketSessionKey)
50+
redis.call('DEL', socketWorkflowKey, socketSessionKey, socketPresenceWorkflowKey)
3951
4052
local remaining = redis.call('HLEN', workflowUsersKey)
4153
if remaining == 0 then
@@ -54,11 +66,13 @@ const UPDATE_ACTIVITY_SCRIPT = `
5466
local workflowUsersKey = KEYS[1]
5567
local socketWorkflowKey = KEYS[2]
5668
local socketSessionKey = KEYS[3]
69+
local socketPresenceWorkflowKey = KEYS[4]
5770
local socketId = ARGV[1]
5871
local cursorJson = ARGV[2]
5972
local selectionJson = ARGV[3]
6073
local lastActivity = ARGV[4]
6174
local ttl = tonumber(ARGV[5])
75+
local presenceWorkflowTtl = tonumber(ARGV[6])
6276
6377
local existingJson = redis.call('HGET', workflowUsersKey, socketId)
6478
if not existingJson then
@@ -78,6 +92,7 @@ existing.lastActivity = tonumber(lastActivity)
7892
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
7993
redis.call('EXPIRE', socketWorkflowKey, ttl)
8094
redis.call('EXPIRE', socketSessionKey, ttl)
95+
redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl)
8196
return 1
8297
`
8398

@@ -164,6 +179,8 @@ export class RedisRoomManager implements IRoomManager {
164179
pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
165180
pipeline.set(KEYS.socketWorkflow(socketId), workflowId)
166181
pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL)
182+
pipeline.set(KEYS.socketPresenceWorkflow(socketId), workflowId)
183+
pipeline.expire(KEYS.socketPresenceWorkflow(socketId), SOCKET_PRESENCE_WORKFLOW_KEY_TTL)
167184
pipeline.hSet(KEYS.socketSession(socketId), {
168185
userId: presence.userId,
169186
userName: presence.userName,
@@ -187,35 +204,50 @@ export class RedisRoomManager implements IRoomManager {
187204
}
188205
}
189206

190-
async removeUserFromRoom(socketId: string, retried = false): Promise<string | null> {
207+
async removeUserFromRoom(
208+
socketId: string,
209+
workflowIdHint?: string,
210+
retried = false
211+
): Promise<string | null> {
191212
if (!this.removeUserScriptSha) {
192213
logger.error('removeUserFromRoom called before initialize()')
193214
return null
194215
}
195216

196217
try {
197218
const workflowId = await this.redis.evalSha(this.removeUserScriptSha, {
198-
keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)],
199-
arguments: ['workflow:', 'workflow:', socketId],
219+
keys: [
220+
KEYS.socketWorkflow(socketId),
221+
KEYS.socketSession(socketId),
222+
KEYS.socketPresenceWorkflow(socketId),
223+
],
224+
arguments: ['workflow:', 'workflow:', socketId, workflowIdHint ?? ''],
200225
})
201226

202-
if (workflowId) {
227+
if (typeof workflowId === 'string' && workflowId.length > 0) {
203228
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
229+
return workflowId
204230
}
205-
return workflowId as string | null
231+
232+
return null
206233
} catch (error) {
207234
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
208235
logger.warn('Lua script not found, reloading...')
209236
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
210-
return this.removeUserFromRoom(socketId, true)
237+
return this.removeUserFromRoom(socketId, workflowIdHint, true)
211238
}
212239
logger.error(`Failed to remove user from room: ${socketId}`, error)
213240
return null
214241
}
215242
}
216243

217244
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
218-
return this.redis.get(KEYS.socketWorkflow(socketId))
245+
const workflowId = await this.redis.get(KEYS.socketWorkflow(socketId))
246+
if (workflowId) {
247+
return workflowId
248+
}
249+
250+
return this.redis.get(KEYS.socketPresenceWorkflow(socketId))
219251
}
220252

221253
async getUserSession(socketId: string): Promise<UserSession | null> {
@@ -278,13 +310,15 @@ export class RedisRoomManager implements IRoomManager {
278310
KEYS.workflowUsers(workflowId),
279311
KEYS.socketWorkflow(socketId),
280312
KEYS.socketSession(socketId),
313+
KEYS.socketPresenceWorkflow(socketId),
281314
],
282315
arguments: [
283316
socketId,
284317
updates.cursor !== undefined ? JSON.stringify(updates.cursor) : '',
285318
updates.selection !== undefined ? JSON.stringify(updates.selection) : '',
286319
(updates.lastActivity ?? Date.now()).toString(),
287320
SOCKET_KEY_TTL.toString(),
321+
SOCKET_PRESENCE_WORKFLOW_KEY_TTL.toString(),
288322
],
289323
})
290324
} catch (error) {
@@ -348,7 +382,7 @@ export class RedisRoomManager implements IRoomManager {
348382

349383
// Remove all users from Redis state
350384
for (const user of users) {
351-
await this.removeUserFromRoom(user.socketId)
385+
await this.removeUserFromRoom(user.socketId, workflowId)
352386
}
353387

354388
// Clean up room data

apps/sim/socket/rooms/types.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ export interface IRoomManager {
6565

6666
/**
6767
* Remove a user from their current room
68-
* Returns the workflowId they were in, or null if not in any room
68+
* Optional workflowIdHint is used when socket mapping keys are missing/expired.
69+
* Returns the workflowId they were in, or null if not in any room.
6970
*/
70-
removeUserFromRoom(socketId: string): Promise<string | null>
71+
removeUserFromRoom(socketId: string, workflowIdHint?: string): Promise<string | null>
7172

7273
/**
7374
* Get the workflow ID for a socket

0 commit comments

Comments
 (0)