Skip to content

Commit 3c30acb

Browse files
committed
feat(transport): replace shared chat transport with mothership-stream module
1 parent 944c0ef commit 3c30acb

File tree

41 files changed

+2000
-1959
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2000
-1959
lines changed

apps/sim/app/api/copilot/chat/abort/route.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { NextResponse } from 'next/server'
22
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
3-
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming'
3+
import { abortActiveStream } from '@/lib/copilot/chat-streaming'
44
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
55
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
66
import { env } from '@/lib/core/config/env'
@@ -55,10 +55,5 @@ export async function POST(request: Request) {
5555
}
5656

5757
const aborted = await abortActiveStream(streamId)
58-
if (chatId) {
59-
await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch(
60-
() => false
61-
)
62-
}
6358
return NextResponse.json({ aborted })
6459
}

apps/sim/app/api/copilot/chat/route.ts

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@ import { getSession } from '@/lib/auth'
88
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
99
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
1010
import {
11-
acquirePendingChatStream,
1211
createSSEStream,
13-
releasePendingChatStream,
1412
requestChatTitle,
1513
SSE_RESPONSE_HEADERS,
1614
} from '@/lib/copilot/chat-streaming'
1715
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
1816
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
19-
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
2017
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
2118
import { resolveActiveResourceContext } from '@/lib/copilot/process-contents'
2219
import {
@@ -110,9 +107,6 @@ const ChatMessageSchema = z.object({
110107
export async function POST(req: NextRequest) {
111108
const tracker = createRequestTracker()
112109
let actualChatId: string | undefined
113-
let pendingChatStreamAcquired = false
114-
let pendingChatStreamHandedOff = false
115-
let pendingChatStreamID: string | undefined
116110

117111
try {
118112
// Get session to access user information including name
@@ -340,21 +334,6 @@ export async function POST(req: NextRequest) {
340334
})
341335
} catch {}
342336

343-
if (stream && actualChatId) {
344-
const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
345-
if (!acquired) {
346-
return NextResponse.json(
347-
{
348-
error:
349-
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
350-
},
351-
{ status: 409 }
352-
)
353-
}
354-
pendingChatStreamAcquired = true
355-
pendingChatStreamID = userMessageIdToUse
356-
}
357-
358337
if (actualChatId) {
359338
const userMsg = {
360339
id: userMessageIdToUse,
@@ -401,7 +380,6 @@ export async function POST(req: NextRequest) {
401380
titleProvider: provider,
402381
requestId: tracker.requestId,
403382
workspaceId: resolvedWorkspaceId,
404-
pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream),
405383
orchestrateOptions: {
406384
userId: authenticatedUserId,
407385
workflowId,
@@ -489,7 +467,6 @@ export async function POST(req: NextRequest) {
489467
},
490468
},
491469
})
492-
pendingChatStreamHandedOff = true
493470

494471
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
495472
}
@@ -595,14 +572,6 @@ export async function POST(req: NextRequest) {
595572
},
596573
})
597574
} catch (error) {
598-
if (
599-
actualChatId &&
600-
pendingChatStreamAcquired &&
601-
!pendingChatStreamHandedOff &&
602-
pendingChatStreamID
603-
) {
604-
await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {})
605-
}
606575
const duration = tracker.getDuration()
607576

608577
if (error instanceof z.ZodError) {
@@ -649,30 +618,6 @@ export async function GET(req: NextRequest) {
649618
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
650619
}
651620

652-
let streamSnapshot: {
653-
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
654-
status: string
655-
} | null = null
656-
657-
if (chat.conversationId) {
658-
try {
659-
const [meta, events] = await Promise.all([
660-
getStreamMeta(chat.conversationId),
661-
readStreamEvents(chat.conversationId, 0),
662-
])
663-
streamSnapshot = {
664-
events: events || [],
665-
status: meta?.status || 'unknown',
666-
}
667-
} catch (err) {
668-
logger.warn('Failed to read stream snapshot for chat', {
669-
chatId,
670-
conversationId: chat.conversationId,
671-
error: err instanceof Error ? err.message : String(err),
672-
})
673-
}
674-
}
675-
676621
const transformedChat = {
677622
id: chat.id,
678623
title: chat.title,
@@ -681,11 +626,10 @@ export async function GET(req: NextRequest) {
681626
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
682627
planArtifact: chat.planArtifact || null,
683628
config: chat.config || null,
684-
conversationId: chat.conversationId || null,
629+
activeStreamId: chat.conversationId || null,
685630
resources: Array.isArray(chat.resources) ? chat.resources : [],
686631
createdAt: chat.createdAt,
687632
updatedAt: chat.updatedAt,
688-
...(streamSnapshot ? { streamSnapshot } : {}),
689633
}
690634

691635
logger.info(`Retrieved chat ${chatId}`)

apps/sim/app/api/copilot/chat/stream/route.test.ts

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,37 @@
55
import { NextRequest } from 'next/server'
66
import { beforeEach, describe, expect, it, vi } from 'vitest'
77

8-
const { getStreamMeta, readStreamEvents, authenticateCopilotRequestSessionOnly } = vi.hoisted(
9-
() => ({
10-
getStreamMeta: vi.fn(),
11-
readStreamEvents: vi.fn(),
12-
authenticateCopilotRequestSessionOnly: vi.fn(),
13-
})
14-
)
8+
const {
9+
getLatestRunForStream,
10+
readEnvelopes,
11+
checkForReplayGap,
12+
authenticateCopilotRequestSessionOnly,
13+
} = vi.hoisted(() => ({
14+
getLatestRunForStream: vi.fn(),
15+
readEnvelopes: vi.fn(),
16+
checkForReplayGap: vi.fn(),
17+
authenticateCopilotRequestSessionOnly: vi.fn(),
18+
}))
19+
20+
vi.mock('@/lib/copilot/async-runs/repository', () => ({
21+
getLatestRunForStream,
22+
}))
1523

16-
vi.mock('@/lib/copilot/orchestrator/stream/buffer', () => ({
17-
getStreamMeta,
18-
readStreamEvents,
24+
vi.mock('@/lib/copilot/mothership-stream', () => ({
25+
readEnvelopes,
26+
checkForReplayGap,
27+
encodeSSEEnvelope: (event: Record<string, unknown>) =>
28+
new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`),
29+
SSE_RESPONSE_HEADERS: {
30+
'Content-Type': 'text/event-stream',
31+
},
1932
}))
2033

2134
vi.mock('@/lib/copilot/request-helpers', () => ({
2235
authenticateCopilotRequestSessionOnly,
2336
}))
2437

25-
import { GET } from '@/app/api/copilot/chat/stream/route'
38+
import { GET } from './route'
2639

2740
describe('copilot chat stream replay route', () => {
2841
beforeEach(() => {
@@ -31,29 +44,32 @@ describe('copilot chat stream replay route', () => {
3144
userId: 'user-1',
3245
isAuthenticated: true,
3346
})
34-
readStreamEvents.mockResolvedValue([])
47+
readEnvelopes.mockResolvedValue([])
48+
checkForReplayGap.mockResolvedValue(null)
3549
})
3650

37-
it('stops replay polling when stream meta becomes cancelled', async () => {
38-
getStreamMeta
51+
it('stops replay polling when run becomes cancelled', async () => {
52+
getLatestRunForStream
3953
.mockResolvedValueOnce({
4054
status: 'active',
41-
userId: 'user-1',
55+
executionId: 'exec-1',
56+
id: 'run-1',
4257
})
4358
.mockResolvedValueOnce({
4459
status: 'cancelled',
45-
userId: 'user-1',
60+
executionId: 'exec-1',
61+
id: 'run-1',
4662
})
4763

4864
const response = await GET(
49-
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1')
65+
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0')
5066
)
5167

5268
const reader = response.body?.getReader()
5369
expect(reader).toBeTruthy()
5470

5571
const first = await reader!.read()
5672
expect(first.done).toBe(true)
57-
expect(getStreamMeta).toHaveBeenCalledTimes(2)
73+
expect(getLatestRunForStream).toHaveBeenCalledTimes(2)
5874
})
5975
})

0 commit comments

Comments
 (0)