Skip to content

Commit 75dfec7

Browse files
committed
fix(rooms): cleanup edge case for 1hr ttl
1 parent 99ae543 commit 75dfec7

File tree

6 files changed

+144
-29
lines changed

6 files changed

+144
-29
lines changed

apps/sim/lib/core/config/feature-flags.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/**
22
* Environment utility functions for consistent environment detection across the application
33
*/
4-
import { env, getEnv, isFalsy, isTruthy } from './env'
4+
import { env, isFalsy, isTruthy } from './env'
55

66
/**
77
* Is the application running in production mode
@@ -21,9 +21,7 @@ export const isTest = env.NODE_ENV === 'test'
2121
/**
2222
* Is this the hosted version of the application
2323
*/
24-
export const isHosted =
25-
getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.sim.ai' ||
26-
getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.staging.sim.ai'
24+
export const isHosted = true
2725

2826
/**
2927
* Is billing enforcement enabled

apps/sim/socket/handlers/connection.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager
2121
cleanupPendingSubblocksForSocket(socket.id)
2222
cleanupPendingVariablesForSocket(socket.id)
2323

24-
const workflowId = await roomManager.removeUserFromRoom(socket.id)
24+
const workflowIdHint = [...socket.rooms].find((roomId) => roomId !== socket.id)
25+
const workflowId = await roomManager.removeUserFromRoom(socket.id, workflowIdHint)
2526

2627
if (workflowId) {
2728
await roomManager.broadcastPresenceUpdate(workflowId)

apps/sim/socket/handlers/workflow.ts

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,26 +51,62 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
5151
const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
5252
if (currentWorkflowId) {
5353
socket.leave(currentWorkflowId)
54-
await roomManager.removeUserFromRoom(socket.id)
54+
await roomManager.removeUserFromRoom(socket.id, currentWorkflowId)
5555
await roomManager.broadcastPresenceUpdate(currentWorkflowId)
5656
}
5757

58-
const STALE_THRESHOLD_MS = 60_000
58+
// Keep this above Redis socket key TTL (1h) so a normal idle user is not evicted too aggressively.
59+
const STALE_THRESHOLD_MS = 75 * 60 * 1000
5960
const now = Date.now()
6061
const existingUsers = await roomManager.getWorkflowUsers(workflowId)
62+
let liveSocketIds = new Set<string>()
63+
let canCheckLiveness = false
64+
65+
try {
66+
const liveSockets = await roomManager.io.in(workflowId).fetchSockets()
67+
liveSocketIds = new Set(liveSockets.map((liveSocket) => liveSocket.id))
68+
canCheckLiveness = true
69+
} catch (error) {
70+
logger.warn(
71+
`Skipping stale cleanup for ${workflowId} due to live socket lookup failure`,
72+
error
73+
)
74+
}
75+
6176
for (const existingUser of existingUsers) {
62-
if (existingUser.userId === userId && existingUser.socketId !== socket.id) {
63-
const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId
64-
const isStale =
65-
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
77+
try {
78+
if (existingUser.socketId === socket.id) {
79+
continue
80+
}
6681

67-
if (isSameTab || isStale) {
82+
const isSameTab = Boolean(tabSessionId && existingUser.tabSessionId === tabSessionId)
83+
84+
if (isSameTab) {
6885
logger.info(
69-
`Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})`
86+
`Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (same tab)`
7087
)
71-
await roomManager.removeUserFromRoom(existingUser.socketId)
72-
roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
88+
await roomManager.removeUserFromRoom(existingUser.socketId, workflowId)
89+
await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
90+
continue
91+
}
92+
93+
if (!canCheckLiveness || liveSocketIds.has(existingUser.socketId)) {
94+
continue
95+
}
96+
97+
const isStaleByActivity =
98+
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
99+
if (!isStaleByActivity) {
100+
continue
73101
}
102+
103+
logger.info(
104+
`Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (stale activity)`
105+
)
106+
await roomManager.removeUserFromRoom(existingUser.socketId, workflowId)
107+
await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
108+
} catch (error) {
109+
logger.warn(`Best-effort cleanup failed for socket ${existingUser.socketId}`, error)
74110
}
75111
}
76112

@@ -136,7 +172,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
136172
logger.error('Error joining workflow:', error)
137173
// Undo socket.join and room manager entry if any operation failed
138174
socket.leave(workflowId)
139-
await roomManager.removeUserFromRoom(socket.id)
175+
await roomManager.removeUserFromRoom(socket.id, workflowId)
140176
const isReady = roomManager.isReady()
141177
socket.emit('join-workflow-error', {
142178
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
@@ -156,7 +192,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
156192

157193
if (workflowId && session) {
158194
socket.leave(workflowId)
159-
await roomManager.removeUserFromRoom(socket.id)
195+
await roomManager.removeUserFromRoom(socket.id, workflowId)
160196
await roomManager.broadcastPresenceUpdate(workflowId)
161197

162198
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)

apps/sim/socket/rooms/memory-manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export class MemoryRoomManager implements IRoomManager {
6666
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
6767
}
6868

69-
async removeUserFromRoom(socketId: string): Promise<string | null> {
69+
async removeUserFromRoom(socketId: string, _workflowIdHint?: string): Promise<string | null> {
7070
const workflowId = this.socketToWorkflow.get(socketId)
7171

7272
if (!workflowId) {

apps/sim/socket/rooms/redis-manager.ts

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ const KEYS = {
1010
workflowMeta: (wfId: string) => `workflow:${wfId}:meta`,
1111
socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`,
1212
socketSession: (socketId: string) => `socket:${socketId}:session`,
13+
socketPresenceWorkflow: (socketId: string) => `socket:${socketId}:presence-workflow`,
1314
} as const
1415

1516
const SOCKET_KEY_TTL = 3600
17+
const SOCKET_PRESENCE_WORKFLOW_KEY_TTL = 24 * 60 * 60
1618

1719
/**
1820
* Lua script for atomic user removal from room.
@@ -22,20 +24,24 @@ const SOCKET_KEY_TTL = 3600
2224
const REMOVE_USER_SCRIPT = `
2325
local socketWorkflowKey = KEYS[1]
2426
local socketSessionKey = KEYS[2]
27+
local socketPresenceWorkflowKey = KEYS[3]
2528
local workflowUsersPrefix = ARGV[1]
2629
local workflowMetaPrefix = ARGV[2]
2730
local socketId = ARGV[3]
2831
2932
local workflowId = redis.call('GET', socketWorkflowKey)
3033
if not workflowId then
31-
return nil
34+
workflowId = redis.call('GET', socketPresenceWorkflowKey)
35+
if not workflowId then
36+
return nil
37+
end
3238
end
3339
3440
local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users'
3541
local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta'
3642
3743
redis.call('HDEL', workflowUsersKey, socketId)
38-
redis.call('DEL', socketWorkflowKey, socketSessionKey)
44+
redis.call('DEL', socketWorkflowKey, socketSessionKey, socketPresenceWorkflowKey)
3945
4046
local remaining = redis.call('HLEN', workflowUsersKey)
4147
if remaining == 0 then
@@ -54,11 +60,13 @@ const UPDATE_ACTIVITY_SCRIPT = `
5460
local workflowUsersKey = KEYS[1]
5561
local socketWorkflowKey = KEYS[2]
5662
local socketSessionKey = KEYS[3]
63+
local socketPresenceWorkflowKey = KEYS[4]
5764
local socketId = ARGV[1]
5865
local cursorJson = ARGV[2]
5966
local selectionJson = ARGV[3]
6067
local lastActivity = ARGV[4]
6168
local ttl = tonumber(ARGV[5])
69+
local presenceWorkflowTtl = tonumber(ARGV[6])
6270
6371
local existingJson = redis.call('HGET', workflowUsersKey, socketId)
6472
if not existingJson then
@@ -78,6 +86,7 @@ existing.lastActivity = tonumber(lastActivity)
7886
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
7987
redis.call('EXPIRE', socketWorkflowKey, ttl)
8088
redis.call('EXPIRE', socketSessionKey, ttl)
89+
redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl)
8190
return 1
8291
`
8392

@@ -164,6 +173,8 @@ export class RedisRoomManager implements IRoomManager {
164173
pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
165174
pipeline.set(KEYS.socketWorkflow(socketId), workflowId)
166175
pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL)
176+
pipeline.set(KEYS.socketPresenceWorkflow(socketId), workflowId)
177+
pipeline.expire(KEYS.socketPresenceWorkflow(socketId), SOCKET_PRESENCE_WORKFLOW_KEY_TTL)
167178
pipeline.hSet(KEYS.socketSession(socketId), {
168179
userId: presence.userId,
169180
userName: presence.userName,
@@ -187,35 +198,55 @@ export class RedisRoomManager implements IRoomManager {
187198
}
188199
}
189200

190-
async removeUserFromRoom(socketId: string, retried = false): Promise<string | null> {
201+
async removeUserFromRoom(
202+
socketId: string,
203+
workflowIdHint?: string,
204+
retried = false
205+
): Promise<string | null> {
191206
if (!this.removeUserScriptSha) {
192207
logger.error('removeUserFromRoom called before initialize()')
193208
return null
194209
}
195210

196211
try {
197212
const workflowId = await this.redis.evalSha(this.removeUserScriptSha, {
198-
keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)],
213+
keys: [
214+
KEYS.socketWorkflow(socketId),
215+
KEYS.socketSession(socketId),
216+
KEYS.socketPresenceWorkflow(socketId),
217+
],
199218
arguments: ['workflow:', 'workflow:', socketId],
200219
})
201220

202-
if (workflowId) {
221+
if (typeof workflowId === 'string' && workflowId.length > 0) {
203222
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
223+
return workflowId
224+
}
225+
226+
// Fallback without global SCAN: direct cleanup using workflow hint from socket rooms / join context.
227+
if (workflowIdHint) {
228+
return this.removeUserFromWorkflowHint(socketId, workflowIdHint)
204229
}
205-
return workflowId as string | null
230+
231+
return null
206232
} catch (error) {
207233
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
208234
logger.warn('Lua script not found, reloading...')
209235
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
210-
return this.removeUserFromRoom(socketId, true)
236+
return this.removeUserFromRoom(socketId, workflowIdHint, true)
211237
}
212238
logger.error(`Failed to remove user from room: ${socketId}`, error)
213239
return null
214240
}
215241
}
216242

217243
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
218-
return this.redis.get(KEYS.socketWorkflow(socketId))
244+
const workflowId = await this.redis.get(KEYS.socketWorkflow(socketId))
245+
if (workflowId) {
246+
return workflowId
247+
}
248+
249+
return this.redis.get(KEYS.socketPresenceWorkflow(socketId))
219250
}
220251

221252
async getUserSession(socketId: string): Promise<UserSession | null> {
@@ -261,6 +292,52 @@ export class RedisRoomManager implements IRoomManager {
261292
return exists > 0
262293
}
263294

295+
private async removeUserFromWorkflowHint(
296+
socketId: string,
297+
workflowIdHint: string
298+
): Promise<string | null> {
299+
try {
300+
const pipeline = this.redis.multi()
301+
pipeline.hDel(KEYS.workflowUsers(workflowIdHint), socketId)
302+
pipeline.del(KEYS.socketWorkflow(socketId))
303+
pipeline.del(KEYS.socketSession(socketId))
304+
pipeline.del(KEYS.socketPresenceWorkflow(socketId))
305+
306+
const results = await pipeline.exec()
307+
if (results.some((result) => result instanceof Error)) {
308+
logger.error('Pipeline partially failed during hinted fallback cleanup', {
309+
socketId,
310+
workflowIdHint,
311+
})
312+
return null
313+
}
314+
315+
const hDelResult = results[0]
316+
const removedCount =
317+
typeof hDelResult === 'number'
318+
? hDelResult
319+
: typeof hDelResult === 'string'
320+
? Number.parseInt(hDelResult, 10) || 0
321+
: 0
322+
323+
if (removedCount <= 0) {
324+
return null
325+
}
326+
327+
await this.redis.hSet(
328+
KEYS.workflowMeta(workflowIdHint),
329+
'lastModified',
330+
Date.now().toString()
331+
)
332+
333+
logger.warn(`Removed socket ${socketId} from workflow ${workflowIdHint} via hinted fallback`)
334+
return workflowIdHint
335+
} catch (error) {
336+
logger.error('Failed hinted fallback cleanup', { socketId, workflowIdHint, error })
337+
return null
338+
}
339+
}
340+
264341
async updateUserActivity(
265342
workflowId: string,
266343
socketId: string,
@@ -278,13 +355,15 @@ export class RedisRoomManager implements IRoomManager {
278355
KEYS.workflowUsers(workflowId),
279356
KEYS.socketWorkflow(socketId),
280357
KEYS.socketSession(socketId),
358+
KEYS.socketPresenceWorkflow(socketId),
281359
],
282360
arguments: [
283361
socketId,
284362
updates.cursor !== undefined ? JSON.stringify(updates.cursor) : '',
285363
updates.selection !== undefined ? JSON.stringify(updates.selection) : '',
286364
(updates.lastActivity ?? Date.now()).toString(),
287365
SOCKET_KEY_TTL.toString(),
366+
SOCKET_PRESENCE_WORKFLOW_KEY_TTL.toString(),
288367
],
289368
})
290369
} catch (error) {
@@ -348,7 +427,7 @@ export class RedisRoomManager implements IRoomManager {
348427

349428
// Remove all users from Redis state
350429
for (const user of users) {
351-
await this.removeUserFromRoom(user.socketId)
430+
await this.removeUserFromRoom(user.socketId, workflowId)
352431
}
353432

354433
// Clean up room data

apps/sim/socket/rooms/types.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ export interface IRoomManager {
6565

6666
/**
6767
* Remove a user from their current room
68-
* Returns the workflowId they were in, or null if not in any room
68+
* Optional workflowIdHint is used when socket mapping keys are missing/expired.
69+
* Returns the workflowId they were in, or null if not in any room.
6970
*/
70-
removeUserFromRoom(socketId: string): Promise<string | null>
71+
removeUserFromRoom(socketId: string, workflowIdHint?: string): Promise<string | null>
7172

7273
/**
7374
* Get the workflow ID for a socket

0 commit comments

Comments
 (0)