Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/sim/serializer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ function shouldSerializeSubBlock(
: group.basicId === subBlockConfig.id
return matchesMode && evaluateSubBlockCondition(subBlockConfig.condition, values)
}
console.log('[FUCK] subBlockConfig.condition', subBlockConfig.condition, values)
return evaluateSubBlockCondition(subBlockConfig.condition, values)
}

Expand Down
3 changes: 2 additions & 1 deletion apps/sim/socket/handlers/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager
cleanupPendingSubblocksForSocket(socket.id)
cleanupPendingVariablesForSocket(socket.id)

const workflowId = await roomManager.removeUserFromRoom(socket.id)
const workflowIdHint = [...socket.rooms].find((roomId) => roomId !== socket.id)
const workflowId = await roomManager.removeUserFromRoom(socket.id, workflowIdHint)

if (workflowId) {
await roomManager.broadcastPresenceUpdate(workflowId)
Expand Down
64 changes: 52 additions & 12 deletions apps/sim/socket/handlers/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,66 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
if (currentWorkflowId) {
socket.leave(currentWorkflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.removeUserFromRoom(socket.id, currentWorkflowId)
await roomManager.broadcastPresenceUpdate(currentWorkflowId)
}

const STALE_THRESHOLD_MS = 60_000
// Keep this above Redis socket key TTL (1h) so a normal idle user is not evicted too aggressively.
const STALE_THRESHOLD_MS = 75 * 60 * 1000
const now = Date.now()
const existingUsers = await roomManager.getWorkflowUsers(workflowId)
let liveSocketIds = new Set<string>()
let canCheckLiveness = false

try {
const liveSockets = await roomManager.io.in(workflowId).fetchSockets()
liveSocketIds = new Set(liveSockets.map((liveSocket) => liveSocket.id))
canCheckLiveness = true
} catch (error) {
logger.warn(
`Skipping stale cleanup for ${workflowId} due to live socket lookup failure`,
error
)
}

for (const existingUser of existingUsers) {
if (existingUser.userId === userId && existingUser.socketId !== socket.id) {
const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId
const isStale =
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
try {
if (existingUser.socketId === socket.id) {
continue
}

if (isSameTab || isStale) {
const isSameTab = Boolean(
existingUser.userId === userId &&
tabSessionId &&
existingUser.tabSessionId === tabSessionId
)

if (isSameTab) {
logger.info(
`Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})`
`Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (same tab)`
)
await roomManager.removeUserFromRoom(existingUser.socketId)
roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
await roomManager.removeUserFromRoom(existingUser.socketId, workflowId)
await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
continue
}

if (!canCheckLiveness || liveSocketIds.has(existingUser.socketId)) {
continue
}

const isStaleByActivity =
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
if (!isStaleByActivity) {
continue
}

logger.info(
`Cleaning up socket ${existingUser.socketId} for user ${existingUser.userId} (stale activity)`
)
await roomManager.removeUserFromRoom(existingUser.socketId, workflowId)
await roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
} catch (error) {
logger.warn(`Best-effort cleanup failed for socket ${existingUser.socketId}`, error)
}
}

Expand Down Expand Up @@ -136,7 +176,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
logger.error('Error joining workflow:', error)
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.removeUserFromRoom(socket.id, workflowId)
const isReady = roomManager.isReady()
socket.emit('join-workflow-error', {
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
Expand All @@ -156,7 +196,7 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:

if (workflowId && session) {
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.removeUserFromRoom(socket.id, workflowId)
await roomManager.broadcastPresenceUpdate(workflowId)

logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/socket/rooms/memory-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class MemoryRoomManager implements IRoomManager {
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
}

async removeUserFromRoom(socketId: string): Promise<string | null> {
async removeUserFromRoom(socketId: string, _workflowIdHint?: string): Promise<string | null> {
const workflowId = this.socketToWorkflow.get(socketId)

if (!workflowId) {
Expand Down
52 changes: 43 additions & 9 deletions apps/sim/socket/rooms/redis-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ const KEYS = {
workflowMeta: (wfId: string) => `workflow:${wfId}:meta`,
socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`,
socketSession: (socketId: string) => `socket:${socketId}:session`,
socketPresenceWorkflow: (socketId: string) => `socket:${socketId}:presence-workflow`,
} as const

const SOCKET_KEY_TTL = 3600
const SOCKET_PRESENCE_WORKFLOW_KEY_TTL = 24 * 60 * 60

/**
* Lua script for atomic user removal from room.
Expand All @@ -22,11 +24,21 @@ const SOCKET_KEY_TTL = 3600
const REMOVE_USER_SCRIPT = `
local socketWorkflowKey = KEYS[1]
local socketSessionKey = KEYS[2]
local socketPresenceWorkflowKey = KEYS[3]
local workflowUsersPrefix = ARGV[1]
local workflowMetaPrefix = ARGV[2]
local socketId = ARGV[3]
local workflowIdHint = ARGV[4]

local workflowId = redis.call('GET', socketWorkflowKey)
if not workflowId then
workflowId = redis.call('GET', socketPresenceWorkflowKey)
end

if not workflowId and workflowIdHint ~= '' then
workflowId = workflowIdHint
end

if not workflowId then
return nil
end
Expand All @@ -35,7 +47,7 @@ local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users'
local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta'

redis.call('HDEL', workflowUsersKey, socketId)
redis.call('DEL', socketWorkflowKey, socketSessionKey)
redis.call('DEL', socketWorkflowKey, socketSessionKey, socketPresenceWorkflowKey)

local remaining = redis.call('HLEN', workflowUsersKey)
if remaining == 0 then
Expand All @@ -54,11 +66,13 @@ const UPDATE_ACTIVITY_SCRIPT = `
local workflowUsersKey = KEYS[1]
local socketWorkflowKey = KEYS[2]
local socketSessionKey = KEYS[3]
local socketPresenceWorkflowKey = KEYS[4]
local socketId = ARGV[1]
local cursorJson = ARGV[2]
local selectionJson = ARGV[3]
local lastActivity = ARGV[4]
local ttl = tonumber(ARGV[5])
local presenceWorkflowTtl = tonumber(ARGV[6])

local existingJson = redis.call('HGET', workflowUsersKey, socketId)
if not existingJson then
Expand All @@ -78,6 +92,7 @@ existing.lastActivity = tonumber(lastActivity)
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
redis.call('EXPIRE', socketWorkflowKey, ttl)
redis.call('EXPIRE', socketSessionKey, ttl)
redis.call('EXPIRE', socketPresenceWorkflowKey, presenceWorkflowTtl)
return 1
`

Expand Down Expand Up @@ -164,6 +179,8 @@ export class RedisRoomManager implements IRoomManager {
pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
pipeline.set(KEYS.socketWorkflow(socketId), workflowId)
pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL)
pipeline.set(KEYS.socketPresenceWorkflow(socketId), workflowId)
pipeline.expire(KEYS.socketPresenceWorkflow(socketId), SOCKET_PRESENCE_WORKFLOW_KEY_TTL)
pipeline.hSet(KEYS.socketSession(socketId), {
userId: presence.userId,
userName: presence.userName,
Expand All @@ -187,35 +204,50 @@ export class RedisRoomManager implements IRoomManager {
}
}

async removeUserFromRoom(socketId: string, retried = false): Promise<string | null> {
async removeUserFromRoom(
socketId: string,
workflowIdHint?: string,
retried = false
): Promise<string | null> {
if (!this.removeUserScriptSha) {
logger.error('removeUserFromRoom called before initialize()')
return null
}

try {
const workflowId = await this.redis.evalSha(this.removeUserScriptSha, {
keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)],
arguments: ['workflow:', 'workflow:', socketId],
keys: [
KEYS.socketWorkflow(socketId),
KEYS.socketSession(socketId),
KEYS.socketPresenceWorkflow(socketId),
],
arguments: ['workflow:', 'workflow:', socketId, workflowIdHint ?? ''],
})

if (workflowId) {
if (typeof workflowId === 'string' && workflowId.length > 0) {
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
return workflowId
}
return workflowId as string | null

return null
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
return this.removeUserFromRoom(socketId, true)
return this.removeUserFromRoom(socketId, workflowIdHint, true)
}
logger.error(`Failed to remove user from room: ${socketId}`, error)
return null
}
}

async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.redis.get(KEYS.socketWorkflow(socketId))
const workflowId = await this.redis.get(KEYS.socketWorkflow(socketId))
if (workflowId) {
return workflowId
}

return this.redis.get(KEYS.socketPresenceWorkflow(socketId))
}

async getUserSession(socketId: string): Promise<UserSession | null> {
Expand Down Expand Up @@ -278,13 +310,15 @@ export class RedisRoomManager implements IRoomManager {
KEYS.workflowUsers(workflowId),
KEYS.socketWorkflow(socketId),
KEYS.socketSession(socketId),
KEYS.socketPresenceWorkflow(socketId),
],
arguments: [
socketId,
updates.cursor !== undefined ? JSON.stringify(updates.cursor) : '',
updates.selection !== undefined ? JSON.stringify(updates.selection) : '',
(updates.lastActivity ?? Date.now()).toString(),
SOCKET_KEY_TTL.toString(),
SOCKET_PRESENCE_WORKFLOW_KEY_TTL.toString(),
],
})
} catch (error) {
Expand Down Expand Up @@ -348,7 +382,7 @@ export class RedisRoomManager implements IRoomManager {

// Remove all users from Redis state
for (const user of users) {
await this.removeUserFromRoom(user.socketId)
await this.removeUserFromRoom(user.socketId, workflowId)
}

// Clean up room data
Expand Down
5 changes: 3 additions & 2 deletions apps/sim/socket/rooms/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ export interface IRoomManager {

/**
* Remove a user from their current room
* Returns the workflowId they were in, or null if not in any room
* Optional workflowIdHint is used when socket mapping keys are missing/expired.
* Returns the workflowId they were in, or null if not in any room.
*/
removeUserFromRoom(socketId: string): Promise<string | null>
removeUserFromRoom(socketId: string, workflowIdHint?: string): Promise<string | null>

/**
* Get the workflow ID for a socket
Expand Down