Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions src/main/presenter/deepchatAgentPresenter/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
import type { ChatMessage } from '@shared/types/core/chat-message'
import { nanoid } from 'nanoid'
import type { ToolOutputGuard } from './toolOutputGuard'
import { buildTerminalErrorBlocks } from './messageStore'

type PermissionType = 'read' | 'write' | 'all' | 'command'

Expand Down Expand Up @@ -734,17 +735,7 @@ export function finalize(state: StreamState, io: IoParams): void {

export function finalizeError(state: StreamState, io: IoParams, error: unknown): void {
const errorMessage = error instanceof Error ? error.message : String(error)
const errorBlock: AssistantMessageBlock = {
type: 'error',
content: errorMessage,
status: 'error',
timestamp: Date.now()
}
state.blocks.push(errorBlock)

for (const block of state.blocks) {
if (block.status === 'pending') block.status = 'error'
}
state.blocks = buildTerminalErrorBlocks(state.blocks, errorMessage)

const endTime = Date.now()
state.metadata.generationTime = endTime - state.startTime
Expand Down
173 changes: 142 additions & 31 deletions src/main/presenter/deepchatAgentPresenter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import {
} from './contextBuilder'
import { appendSummarySection, CompactionService, type CompactionIntent } from './compactionService'
import { buildPersistableMessageTracePayload } from './messageTracePayload'
import { DeepChatMessageStore } from './messageStore'
import { buildTerminalErrorBlocks, DeepChatMessageStore } from './messageStore'
import { PendingInputCoordinator } from './pendingInputCoordinator'
import { DeepChatPendingInputStore } from './pendingInputStore'
import { processStream } from './process'
Expand Down Expand Up @@ -95,6 +95,12 @@ type SystemPromptCacheEntry = {
fingerprint: string
}

type ActiveGeneration = {
runId: string
messageId: string
abortController: AbortController
}

const isReasoningEffort = (value: unknown): value is 'minimal' | 'low' | 'medium' | 'high' =>
value === 'minimal' || value === 'low' || value === 'medium' || value === 'high'

Expand All @@ -113,6 +119,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
private readonly runtimeState: Map<string, DeepChatSessionState> = new Map()
private readonly sessionGenerationSettings: Map<string, SessionGenerationSettings> = new Map()
private readonly abortControllers: Map<string, AbortController> = new Map()
private readonly activeGenerations: Map<string, ActiveGeneration> = new Map()
private readonly sessionAgentIds: Map<string, string> = new Map()
private readonly sessionProjectDirs: Map<string, string | null> = new Map()
private readonly systemPromptCache: Map<string, SystemPromptCacheEntry> = new Map()
Expand All @@ -123,6 +130,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
private readonly compactionService: CompactionService
private readonly toolOutputGuard: ToolOutputGuard
private readonly hooksBridge?: NewSessionHooksBridge
private nextRunSequence = 0

constructor(
llmProviderPresenter: ILlmProviderPresenter,
Expand Down Expand Up @@ -207,11 +215,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
}

async destroySession(sessionId: string): Promise<void> {
const controller = this.abortControllers.get(sessionId)
const controller =
this.activeGenerations.get(sessionId)?.abortController ?? this.abortControllers.get(sessionId)
if (controller) {
controller.abort()
this.abortControllers.delete(sessionId)
}
this.activeGenerations.delete(sessionId)

this.pendingInputCoordinator.deleteBySession(sessionId)
this.messageStore.deleteBySession(sessionId)
Expand Down Expand Up @@ -264,14 +274,19 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
throw new Error(`Session ${sessionId} not found`)
}

const record = this.pendingInputCoordinator.queuePendingInput(sessionId, content)
if (this.isAwaitingToolQuestionFollowUp(sessionId)) {
const claimedFollowUp = this.pendingInputCoordinator.claimQueuedInput(sessionId, record.id)
void this.processMessage(sessionId, claimedFollowUp.payload, {
const shouldClaimImmediately =
this.isAwaitingToolQuestionFollowUp(sessionId) ||
this.shouldStartQueuedInputImmediately(sessionId, state.status)
const record = this.pendingInputCoordinator.queuePendingInput(sessionId, content, {
state: shouldClaimImmediately ? 'claimed' : 'pending'
})

if (record.state === 'claimed') {
void this.processMessage(sessionId, record.payload, {
projectDir: this.resolveProjectDir(sessionId),
pendingQueueItemId: claimedFollowUp.id
pendingQueueItemId: record.id
})
return claimedFollowUp
return record
}

void this.drainPendingQueueIfPossible(sessionId, 'enqueue')
Expand Down Expand Up @@ -345,6 +360,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation {

this.setSessionStatus(sessionId, 'generating')
let consumedPendingQueueItem = false
let userMessageId: string | null = null
let assistantMessageId: string | null = null

try {
const generationSettings = await this.getEffectiveSessionGenerationSettings(sessionId)
Expand Down Expand Up @@ -382,7 +399,6 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
preserveInterleavedReasoning: interleavedReasoning.preserveReasoningContent,
newUserContent: normalizedInput
})
let userMessageId: string
let summaryState: SessionSummaryState

if (compactionIntent) {
Expand All @@ -397,7 +413,6 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
this.messageStore.getNextOrderSeq(sessionId),
userContent
)
this.emitMessageRefresh(sessionId, userMessageId)
this.emitCompactionState(sessionId, {
status: 'compacting',
cursorOrderSeq: compactionIntent.targetCursorOrderSeq,
Expand All @@ -415,6 +430,10 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
userContent
)
}
if (!userMessageId) {
throw new Error('Failed to create user message.')
}
this.emitMessageRefresh(sessionId, userMessageId)

this.dispatchHook('UserPromptSubmit', {
sessionId,
Expand Down Expand Up @@ -442,21 +461,18 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
)

const assistantOrderSeq = this.messageStore.getNextOrderSeq(sessionId)
const assistantMessageId = this.messageStore.createAssistantMessage(
sessionId,
assistantOrderSeq
)
assistantMessageId = this.messageStore.createAssistantMessage(sessionId, assistantOrderSeq)

if (context?.pendingQueueItemId) {
this.pendingInputCoordinator.consumeQueuedInput(sessionId, context.pendingQueueItemId)
consumedPendingQueueItem = true
}

if (context?.emitRefreshBeforeStream) {
this.emitMessageRefresh(sessionId, assistantMessageId || userMessageId)
this.emitMessageRefresh(sessionId, assistantMessageId)
}

const result = await this.runStreamForMessage({
const { runId, result } = await this.runStreamForMessage({
sessionId,
messageId: assistantMessageId,
messages,
Expand All @@ -465,7 +481,11 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
tools,
interleavedReasoning
})
this.applyProcessResultStatus(sessionId, result)
try {
this.applyProcessResultStatus(sessionId, result, runId)
} finally {
this.clearActiveGeneration(sessionId, runId)
}
if (result?.status === 'completed') {
void this.drainPendingQueueIfPossible(sessionId, 'completed')
}
Expand All @@ -482,6 +502,15 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
}
}
const errorMessage = err instanceof Error ? err.message : String(err)
if (assistantMessageId) {
const existingAssistant = this.messageStore.getMessage(assistantMessageId)
const blocks = buildTerminalErrorBlocks(
existingAssistant ? this.parseAssistantBlocks(existingAssistant.content) : [],
errorMessage
)
this.messageStore.setMessageError(assistantMessageId, blocks)
this.emitMessageRefresh(sessionId, assistantMessageId)
}
this.dispatchHook('Stop', {
sessionId,
providerId: state.providerId,
Expand Down Expand Up @@ -825,10 +854,32 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
}

async cancelGeneration(sessionId: string): Promise<void> {
const controller = this.abortControllers.get(sessionId)
if (controller) {
controller.abort()
this.abortControllers.delete(sessionId)
const activeGeneration = this.activeGenerations.get(sessionId)
if (activeGeneration) {
activeGeneration.abortController.abort()
this.clearActiveGeneration(sessionId, activeGeneration.runId)

const assistantMessage = this.messageStore.getMessage(activeGeneration.messageId)
if (assistantMessage?.role === 'assistant') {
const blocks = buildTerminalErrorBlocks(
this.parseAssistantBlocks(assistantMessage.content),
'common.error.userCanceledGeneration'
)
this.messageStore.setMessageError(activeGeneration.messageId, blocks)
this.emitMessageRefresh(sessionId, activeGeneration.messageId)
}

this.dispatchTerminalHooks(sessionId, this.runtimeState.get(sessionId), {
status: 'aborted',
stopReason: 'user_stop',
errorMessage: 'common.error.userCanceledGeneration'
})
} else {
const controller = this.abortControllers.get(sessionId)
if (controller) {
controller.abort()
this.abortControllers.delete(sessionId)
}
}
this.setSessionStatus(sessionId, 'idle')
}
Expand Down Expand Up @@ -1139,7 +1190,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
initialBlocks?: AssistantMessageBlock[]
promptPreview?: string
interleavedReasoning?: InterleavedReasoningConfig
}): Promise<ProcessResult> {
}): Promise<{ runId: string; result: ProcessResult }> {
const {
sessionId,
messageId,
Expand Down Expand Up @@ -1217,7 +1268,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
const supportsVision = this.supportsVision(state.providerId, state.modelId)

const abortController = new AbortController()
this.abortControllers.set(sessionId, abortController)
const activeGeneration = this.registerActiveGeneration(sessionId, messageId, abortController)

try {
this.dispatchHook('SessionStart', {
Expand All @@ -1229,7 +1280,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
projectDir
})

return await processStream({
const result = await processStream({
messages,
tools,
toolPresenter: this.toolPresenter,
Expand Down Expand Up @@ -1356,11 +1407,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
abortSignal: abortController.signal
}
})
} finally {
const active = this.abortControllers.get(sessionId)
if (active === abortController) {
this.abortControllers.delete(sessionId)
return {
runId: activeGeneration.runId,
result
}
} catch (error) {
this.clearActiveGeneration(sessionId, activeGeneration.runId)
throw error
}
}

Expand Down Expand Up @@ -1444,6 +1497,22 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
}
}

private shouldStartQueuedInputImmediately(
sessionId: string,
status: DeepChatSessionState['status']
): boolean {
if (!this.canDrainPendingQueueFromStatus(status, 'enqueue')) {
return false
}
if (this.hasPendingInteractions(sessionId)) {
return false
}
if (this.drainingPendingQueues.has(sessionId)) {
return false
}
return this.pendingInputCoordinator.getNextQueuedInput(sessionId) === null
}

private canDrainPendingQueueFromStatus(
status: DeepChatSessionState['status'],
reason: 'enqueue' | 'resume' | 'completed'
Expand All @@ -1455,10 +1524,44 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
return (reason === 'enqueue' || reason === 'resume') && status === 'error'
}

private registerActiveGeneration(
sessionId: string,
messageId: string,
abortController: AbortController
): ActiveGeneration {
const generation: ActiveGeneration = {
runId: `${sessionId}:${++this.nextRunSequence}`,
messageId,
abortController
}
this.activeGenerations.set(sessionId, generation)
this.abortControllers.set(sessionId, abortController)
return generation
}

private clearActiveGeneration(sessionId: string, runId: string): void {
const activeGeneration = this.activeGenerations.get(sessionId)
if (!activeGeneration || activeGeneration.runId !== runId) {
return
}
this.activeGenerations.delete(sessionId)
if (this.abortControllers.get(sessionId) === activeGeneration.abortController) {
this.abortControllers.delete(sessionId)
}
}

private isActiveRun(sessionId: string, runId: string): boolean {
return this.activeGenerations.get(sessionId)?.runId === runId
}

private applyProcessResultStatus(
sessionId: string,
result: ProcessResult | null | undefined
result: ProcessResult | null | undefined,
runId?: string
): void {
if (runId && !this.isActiveRun(sessionId, runId)) {
return
}
const state = this.runtimeState.get(sessionId)
if (!result || !result.status) {
this.setSessionStatus(sessionId, 'idle')
Expand Down Expand Up @@ -1576,7 +1679,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
}
}

const result = await this.runStreamForMessage({
const { runId, result } = await this.runStreamForMessage({
sessionId,
messageId,
messages: resumeContext,
Expand All @@ -1585,13 +1688,21 @@ export class DeepChatAgentPresenter implements IAgentImplementation {
initialBlocks,
interleavedReasoning
})
this.applyProcessResultStatus(sessionId, result)
try {
this.applyProcessResultStatus(sessionId, result, runId)
} finally {
this.clearActiveGeneration(sessionId, runId)
}
if (result?.status === 'completed') {
void this.drainPendingQueueIfPossible(sessionId, 'completed')
}
return true
} catch (error) {
console.error('[DeepChatAgent] resumeAssistantMessage error:', error)
const errorMessage = error instanceof Error ? error.message : String(error)
const blocks = buildTerminalErrorBlocks(initialBlocks, errorMessage)
this.messageStore.setMessageError(messageId, blocks)
this.emitMessageRefresh(sessionId, messageId)
this.setSessionStatus(sessionId, 'error')
throw error
} finally {
Expand Down
Loading
Loading