Skip to content

Commit a213ad9

Browse files
committed
improvement(rooms): redis client closed should fail fast
1 parent c6357f7 commit a213ad9

File tree

9 files changed

+114
-17
lines changed

9 files changed

+114
-17
lines changed

apps/sim/app/workspace/providers/socket-provider.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
1414
import { useParams } from 'next/navigation'
1515
import { io, type Socket } from 'socket.io-client'
1616
import { getEnv } from '@/lib/core/config/env'
17+
import { useOperationQueueStore } from '@/stores/operation-queue/store'
1718

1819
const logger = createLogger('SocketContext')
1920

@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
138139
const [authFailed, setAuthFailed] = useState(false)
139140
const initializedRef = useRef(false)
140141
const socketRef = useRef<Socket | null>(null)
142+
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
141143

142144
const params = useParams()
143145
const urlWorkflowId = params?.workflowId as string | undefined
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
341343
})
342344
})
343345

344-
socketInstance.on('join-workflow-error', ({ error }) => {
346+
socketInstance.on('join-workflow-error', ({ error, code }) => {
345347
isRejoiningRef.current = false
346-
logger.error('Failed to join workflow:', error)
348+
logger.error('Failed to join workflow:', { error, code })
349+
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
350+
triggerOfflineMode()
351+
}
347352
})
348353

349354
socketInstance.on('workflow-operation', (data) => {

apps/sim/socket/handlers/operations.ts

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,49 @@ import {
1212
import { persistWorkflowOperation } from '@/socket/database/operations'
1313
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
1414
import { checkRolePermission } from '@/socket/middleware/permissions'
15-
import type { IRoomManager } from '@/socket/rooms'
15+
import type { IRoomManager, UserSession } from '@/socket/rooms'
1616
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
1717

1818
const logger = createLogger('OperationsHandlers')
1919

2020
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
2121
socket.on('workflow-operation', async (data) => {
22-
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
23-
const session = await roomManager.getUserSession(socket.id)
22+
if (!roomManager.isReady()) {
23+
socket.emit('operation-forbidden', {
24+
type: 'ROOM_MANAGER_UNAVAILABLE',
25+
message: 'Realtime unavailable',
26+
})
27+
if (data?.operationId) {
28+
socket.emit('operation-failed', {
29+
operationId: data.operationId,
30+
error: 'Realtime unavailable',
31+
retryable: true,
32+
})
33+
}
34+
return
35+
}
36+
37+
let workflowId: string | null = null
38+
let session: UserSession | null = null
39+
40+
try {
41+
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
42+
session = await roomManager.getUserSession(socket.id)
43+
} catch (error) {
44+
logger.error('Error loading session for workflow operation:', error)
45+
socket.emit('operation-forbidden', {
46+
type: 'ROOM_MANAGER_UNAVAILABLE',
47+
message: 'Realtime unavailable',
48+
})
49+
if (data?.operationId) {
50+
socket.emit('operation-failed', {
51+
operationId: data.operationId,
52+
error: 'Realtime unavailable',
53+
retryable: true,
54+
})
55+
}
56+
return
57+
}
2458

2559
if (!workflowId || !session) {
2660
socket.emit('operation-forbidden', {

apps/sim/socket/handlers/subblocks.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
4848
operationId,
4949
} = data
5050

51+
if (!roomManager.isReady()) {
52+
socket.emit('operation-forbidden', {
53+
type: 'ROOM_MANAGER_UNAVAILABLE',
54+
message: 'Realtime unavailable',
55+
})
56+
if (operationId) {
57+
socket.emit('operation-failed', {
58+
operationId,
59+
error: 'Realtime unavailable',
60+
retryable: true,
61+
})
62+
}
63+
return
64+
}
65+
5166
try {
5267
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
5368
const session = await roomManager.getUserSession(socket.id)

apps/sim/socket/handlers/variables.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
3737
socket.on('variable-update', async (data) => {
3838
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
3939

40+
if (!roomManager.isReady()) {
41+
socket.emit('operation-forbidden', {
42+
type: 'ROOM_MANAGER_UNAVAILABLE',
43+
message: 'Realtime unavailable',
44+
})
45+
if (operationId) {
46+
socket.emit('operation-failed', {
47+
operationId,
48+
error: 'Realtime unavailable',
49+
retryable: true,
50+
})
51+
}
52+
return
53+
}
54+
4055
try {
4156
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
4257
const session = await roomManager.getUserSession(socket.id)

apps/sim/socket/handlers/workflow.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
2020
return
2121
}
2222

23+
if (!roomManager.isReady()) {
24+
logger.warn(`Join workflow rejected: Room manager unavailable`)
25+
socket.emit('join-workflow-error', {
26+
error: 'Realtime unavailable',
27+
code: 'ROOM_MANAGER_UNAVAILABLE',
28+
})
29+
return
30+
}
31+
2332
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
2433

2534
// Verify workflow access
@@ -128,12 +137,19 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
128137
// Undo socket.join and room manager entry if any operation failed
129138
socket.leave(workflowId)
130139
await roomManager.removeUserFromRoom(socket.id)
131-
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
140+
socket.emit('join-workflow-error', {
141+
error: roomManager.isReady() ? 'Failed to join workflow' : 'Realtime unavailable',
142+
code: roomManager.isReady() ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
143+
})
132144
}
133145
})
134146

135147
socket.on('leave-workflow', async () => {
136148
try {
149+
if (!roomManager.isReady()) {
150+
return
151+
}
152+
137153
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
138154
const session = await roomManager.getUserSession(socket.id)
139155

apps/sim/socket/rooms/memory-manager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
2626
logger.info('MemoryRoomManager initialized (single-pod mode)')
2727
}
2828

29+
isReady(): boolean {
30+
return true
31+
}
32+
2933
async shutdown(): Promise<void> {
3034
this.workflowRooms.clear()
3135
this.socketToWorkflow.clear()

apps/sim/socket/rooms/redis-manager.ts

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
9696
this._io = io
9797
this.redis = createClient({
9898
url: redisUrl,
99-
socket: {
100-
reconnectStrategy: (retries) => {
101-
if (retries > 10) {
102-
logger.error('Redis reconnection failed after 10 attempts')
103-
return new Error('Redis reconnection failed')
104-
}
105-
const delay = Math.min(retries * 100, 3000)
106-
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
107-
return delay
108-
},
109-
},
11099
})
111100

112101
this.redis.on('error', (err) => {
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
122111
logger.info('Redis client ready')
123112
this.isConnected = true
124113
})
114+
115+
this.redis.on('end', () => {
116+
logger.warn('Redis client connection closed')
117+
this.isConnected = false
118+
})
125119
}
126120

127121
get io(): Server {
128122
return this._io
129123
}
130124

125+
isReady(): boolean {
126+
return this.isConnected
127+
}
128+
131129
async initialize(): Promise<void> {
132130
if (this.isConnected) return
133131

apps/sim/socket/rooms/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ export interface IRoomManager {
4848
*/
4949
initialize(): Promise<void>
5050

51+
/**
52+
* Whether the room manager is ready to serve requests
53+
*/
54+
isReady(): boolean
55+
5156
/**
5257
* Clean shutdown
5358
*/

apps/sim/socket/routes/http.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
8585
res.end(JSON.stringify({ error: authResult.error }))
8686
return
8787
}
88+
89+
if (!roomManager.isReady()) {
90+
sendError(res, 'Room manager unavailable', 503)
91+
return
92+
}
8893
}
8994

9095
// Handle workflow deletion notifications from the main API

0 commit comments

Comments
 (0)