Skip to content

Commit 66fca0f

Browse files
committed
fix(mothership): reconcile stuck conversation_id against Redis lock to clear stuck-yellow task tiles
copilot_chats.conversation_id has no TTL/heartbeat, so when a stream process dies before the clear path runs (pod OOM, SIGKILL, uncaught throw, deploy mid-stream) the column is orphaned and the task tile renders yellow forever. The Redis lock at copilot:chat-stream-lock:<chatId> is the canonical liveness signal and self-heals via 60s TTL + 20s heartbeat, but the mothership APIs weren't consulting it. Adds read-time reconciliation: a batched MGET helper checks whether each persisted conversation_id still has a live Redis lock, and both GET /api/mothership/chats and GET /api/mothership/chats/[chatId] rewrite the marker to null when the lock has expired. No DB writes; stuck rows self-heal on next fetch.
1 parent d895e0e commit 66fca0f

6 files changed

Lines changed: 494 additions & 11 deletions

File tree

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { copilotHttpMock, copilotHttpMockFns } from '@sim/testing'
5+
import { NextRequest } from 'next/server'
6+
import { beforeEach, describe, expect, it, vi } from 'vitest'
7+
8+
const {
9+
mockGetAccessibleCopilotChat,
10+
mockGetActiveChatStreamIds,
11+
mockReadEvents,
12+
mockReadFilePreviewSessions,
13+
mockGetLatestRunForStream,
14+
} = vi.hoisted(() => ({
15+
mockGetAccessibleCopilotChat: vi.fn(),
16+
mockGetActiveChatStreamIds: vi.fn(),
17+
mockReadEvents: vi.fn(),
18+
mockReadFilePreviewSessions: vi.fn(),
19+
mockGetLatestRunForStream: vi.fn(),
20+
}))
21+
22+
vi.mock('@sim/db', () => ({ db: {} }))
23+
24+
vi.mock('@sim/db/schema', () => ({
25+
copilotChats: {
26+
id: 'copilotChats.id',
27+
userId: 'copilotChats.userId',
28+
type: 'copilotChats.type',
29+
updatedAt: 'copilotChats.updatedAt',
30+
lastSeenAt: 'copilotChats.lastSeenAt',
31+
},
32+
}))
33+
34+
vi.mock('drizzle-orm', () => ({
35+
and: vi.fn((...conditions: unknown[]) => ({ type: 'and', conditions })),
36+
eq: vi.fn((field: unknown, value: unknown) => ({ type: 'eq', field, value })),
37+
sql: Object.assign(
38+
vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({
39+
type: 'sql',
40+
strings,
41+
values,
42+
})),
43+
{ raw: vi.fn() }
44+
),
45+
}))
46+
47+
vi.mock('@/lib/copilot/request/http', () => copilotHttpMock)
48+
49+
vi.mock('@/lib/copilot/chat/lifecycle', () => ({
50+
getAccessibleCopilotChat: mockGetAccessibleCopilotChat,
51+
}))
52+
53+
vi.mock('@/lib/copilot/request/session/abort', () => ({
54+
getActiveChatStreamIds: mockGetActiveChatStreamIds,
55+
}))
56+
57+
vi.mock('@/lib/copilot/request/session/buffer', () => ({
58+
readEvents: mockReadEvents,
59+
}))
60+
61+
vi.mock('@/lib/copilot/request/session/file-preview-session', () => ({
62+
readFilePreviewSessions: mockReadFilePreviewSessions,
63+
}))
64+
65+
vi.mock('@/lib/copilot/async-runs/repository', () => ({
66+
getLatestRunForStream: mockGetLatestRunForStream,
67+
}))
68+
69+
vi.mock('@/lib/copilot/request/session/types', () => ({
70+
toStreamBatchEvent: (e: unknown) => e,
71+
}))
72+
73+
vi.mock('@/lib/copilot/chat/effective-transcript', () => ({
74+
buildEffectiveChatTranscript: ({ messages }: { messages: unknown[] }) => messages,
75+
}))
76+
77+
vi.mock('@/lib/copilot/chat/persisted-message', () => ({
78+
normalizeMessage: (m: unknown) => m,
79+
}))
80+
81+
vi.mock('@/lib/copilot/tasks', () => ({
82+
taskPubSub: { publishStatusChanged: vi.fn() },
83+
}))
84+
85+
vi.mock('@/lib/posthog/server', () => ({
86+
captureServerEvent: vi.fn(),
87+
}))
88+
89+
import { GET } from '@/app/api/mothership/chats/[chatId]/route'
90+
91+
function makeContext(chatId: string) {
92+
return { params: Promise.resolve({ chatId }) }
93+
}
94+
95+
function createRequest(chatId: string) {
96+
return new NextRequest(`http://localhost:3000/api/mothership/chats/${chatId}`, {
97+
method: 'GET',
98+
})
99+
}
100+
101+
describe('GET /api/mothership/chats/[chatId]', () => {
102+
beforeEach(() => {
103+
vi.clearAllMocks()
104+
copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValue({
105+
userId: 'user-1',
106+
isAuthenticated: true,
107+
})
108+
mockGetActiveChatStreamIds.mockResolvedValue(new Set<string>())
109+
mockReadEvents.mockResolvedValue([])
110+
mockReadFilePreviewSessions.mockResolvedValue([])
111+
mockGetLatestRunForStream.mockResolvedValue(null)
112+
})
113+
114+
it('clears conversationId when the redis lock has expired (stuck-yellow bug)', async () => {
115+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
116+
id: 'chat-stuck',
117+
type: 'mothership',
118+
title: 'Stuck',
119+
messages: [],
120+
resources: [],
121+
conversationId: 'stream-orphaned',
122+
createdAt: new Date('2026-05-11T12:00:00Z'),
123+
updatedAt: new Date('2026-05-11T12:00:00Z'),
124+
})
125+
mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set<string>())
126+
127+
const response = await GET(createRequest('chat-stuck'), makeContext('chat-stuck'))
128+
expect(response.status).toBe(200)
129+
const body = await response.json()
130+
131+
expect(mockGetActiveChatStreamIds).toHaveBeenCalledWith(['chat-stuck'])
132+
expect(body.success).toBe(true)
133+
expect(body.chat.conversationId).toBeNull()
134+
expect(body.chat.streamSnapshot).toBeUndefined()
135+
expect(mockReadEvents).not.toHaveBeenCalled()
136+
})
137+
138+
it('returns the live conversationId when redis confirms the lock', async () => {
139+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
140+
id: 'chat-live',
141+
type: 'mothership',
142+
title: 'Live',
143+
messages: [],
144+
resources: [],
145+
conversationId: 'stream-live',
146+
createdAt: new Date('2026-05-11T12:00:00Z'),
147+
updatedAt: new Date('2026-05-11T12:00:00Z'),
148+
})
149+
mockGetActiveChatStreamIds.mockResolvedValueOnce(new Set(['chat-live']))
150+
mockGetLatestRunForStream.mockResolvedValueOnce({ status: 'active' })
151+
152+
const response = await GET(createRequest('chat-live'), makeContext('chat-live'))
153+
expect(response.status).toBe(200)
154+
const body = await response.json()
155+
156+
expect(body.chat.conversationId).toBe('stream-live')
157+
expect(mockReadEvents).toHaveBeenCalledWith('stream-live', '0')
158+
expect(body.chat.streamSnapshot).toBeDefined()
159+
expect(body.chat.streamSnapshot.status).toBe('active')
160+
})
161+
162+
it('skips the reconciliation lookup when conversationId is already null', async () => {
163+
mockGetAccessibleCopilotChat.mockResolvedValueOnce({
164+
id: 'chat-idle',
165+
type: 'mothership',
166+
title: 'Idle',
167+
messages: [],
168+
resources: [],
169+
conversationId: null,
170+
createdAt: new Date('2026-05-11T12:00:00Z'),
171+
updatedAt: new Date('2026-05-11T12:00:00Z'),
172+
})
173+
174+
const response = await GET(createRequest('chat-idle'), makeContext('chat-idle'))
175+
expect(response.status).toBe(200)
176+
177+
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
178+
const body = await response.json()
179+
expect(body.chat.conversationId).toBeNull()
180+
})
181+
182+
it('returns 404 when the chat does not exist', async () => {
183+
mockGetAccessibleCopilotChat.mockResolvedValueOnce(null)
184+
185+
const response = await GET(createRequest('chat-missing'), makeContext('chat-missing'))
186+
expect(response.status).toBe(404)
187+
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
188+
})
189+
190+
it('returns 401 when unauthenticated', async () => {
191+
copilotHttpMockFns.mockAuthenticateCopilotRequestSessionOnly.mockResolvedValueOnce({
192+
userId: null,
193+
isAuthenticated: false,
194+
})
195+
196+
const response = await GET(createRequest('chat-x'), makeContext('chat-x'))
197+
expect(response.status).toBe(401)
198+
expect(mockGetAccessibleCopilotChat).not.toHaveBeenCalled()
199+
expect(mockGetActiveChatStreamIds).not.toHaveBeenCalled()
200+
})
201+
})

apps/sim/app/api/mothership/chats/[chatId]/route.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
createUnauthorizedResponse,
2121
} from '@/lib/copilot/request/http'
2222
import type { FilePreviewSession } from '@/lib/copilot/request/session'
23+
import { getActiveChatStreamIds } from '@/lib/copilot/request/session/abort'
2324
import { readEvents } from '@/lib/copilot/request/session/buffer'
2425
import { readFilePreviewSessions } from '@/lib/copilot/request/session/file-preview-session'
2526
import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types'
@@ -52,23 +53,34 @@ export const GET = withRouteHandler(
5253
status: string
5354
} | null = null
5455

55-
if (chat.conversationId) {
56+
// Reconcile the persisted stream marker against the canonical Redis
57+
// lock. If `conversation_id` is set but no lock is held, the stream
58+
// is no longer running (process died before finalize) — treat the
59+
// marker as null so the client doesn't try to reconnect to a dead
60+
// stream. Mirrors the same reconciliation in the task list route.
61+
const activeIds = chat.conversationId
62+
? await getActiveChatStreamIds([chat.id])
63+
: new Set<string>()
64+
const liveConversationId =
65+
chat.conversationId && activeIds.has(chat.id) ? chat.conversationId : null
66+
67+
if (liveConversationId) {
5668
try {
5769
const [events, previewSessions] = await Promise.all([
58-
readEvents(chat.conversationId, '0'),
59-
readFilePreviewSessions(chat.conversationId).catch((error) => {
70+
readEvents(liveConversationId, '0'),
71+
readFilePreviewSessions(liveConversationId).catch((error) => {
6072
logger.warn('Failed to read preview sessions for mothership chat', {
6173
chatId,
62-
conversationId: chat.conversationId,
74+
conversationId: liveConversationId,
6375
error: toError(error).message,
6476
})
6577
return []
6678
}),
6779
])
68-
const run = await getLatestRunForStream(chat.conversationId, userId).catch((error) => {
80+
const run = await getLatestRunForStream(liveConversationId, userId).catch((error) => {
6981
logger.warn('Failed to fetch latest run for mothership chat snapshot', {
7082
chatId,
71-
conversationId: chat.conversationId,
83+
conversationId: liveConversationId,
7284
error: toError(error).message,
7385
})
7486
return null
@@ -87,7 +99,7 @@ export const GET = withRouteHandler(
8799
} catch (error) {
88100
logger.warn('Failed to read stream snapshot for mothership chat', {
89101
chatId,
90-
conversationId: chat.conversationId,
102+
conversationId: liveConversationId,
91103
error: toError(error).message,
92104
})
93105
}
@@ -100,7 +112,7 @@ export const GET = withRouteHandler(
100112
: []
101113
const effectiveMessages = buildEffectiveChatTranscript({
102114
messages: normalizedMessages,
103-
activeStreamId: chat.conversationId || null,
115+
activeStreamId: liveConversationId || null,
104116
...(streamSnapshot ? { streamSnapshot } : {}),
105117
})
106118

@@ -110,7 +122,7 @@ export const GET = withRouteHandler(
110122
id: chat.id,
111123
title: chat.title,
112124
messages: effectiveMessages,
113-
conversationId: chat.conversationId || null,
125+
conversationId: liveConversationId || null,
114126
resources: Array.isArray(chat.resources) ? chat.resources : [],
115127
createdAt: chat.createdAt,
116128
updatedAt: chat.updatedAt,

0 commit comments

Comments
 (0)