Skip to content

Commit 2559494

Browse files
committed
Fix merge conflicts
1 parent 6b963d7 commit 2559494

File tree

7 files changed

+453
-27
lines changed

7 files changed

+453
-27
lines changed

apps/sim/lib/copilot/orchestrator/index.ts

Lines changed: 258 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
import { createLogger } from '@sim/logger'
22
import { updateRunStatus } from '@/lib/copilot/async-runs/repository'
33
import { SIM_AGENT_API_URL, SIM_AGENT_VERSION } from '@/lib/copilot/constants'
4-
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
5-
import type {
6-
ExecutionContext,
7-
OrchestratorOptions,
8-
OrchestratorResult,
9-
SSEEvent,
4+
import {
5+
isToolAvailableOnSimSide,
6+
prepareExecutionContext,
7+
} from '@/lib/copilot/orchestrator/tool-executor'
8+
import {
9+
type ExecutionContext,
10+
isTerminalToolCallStatus,
11+
type OrchestratorOptions,
12+
type OrchestratorResult,
13+
type SSEEvent,
14+
type ToolCallState,
1015
} from '@/lib/copilot/orchestrator/types'
1116
import { env } from '@/lib/core/config/env'
1217
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
@@ -19,6 +24,7 @@ const RESUME_UPSTREAM_MAX_ATTEMPTS = 3
1924
const RESUME_UPSTREAM_RETRY_MS = 500
2025
const ASYNC_RESUME_DIAG_TAG = '[ASYNC_RESUME_DIAG]'
2126

27+
<<<<<<< HEAD
2228
interface CheckpointReadyResponse {
2329
success?: boolean
2430
checkpointId?: string
@@ -31,6 +37,15 @@ interface CheckpointReadyResponse {
3137
code?: string
3238
retryable?: boolean
3339
}
40+
=======
41+
function didAsyncToolSucceed(input: {
42+
durableStatus?: string | null
43+
durableResult?: Record<string, unknown>
44+
durableError?: string | null
45+
toolStateStatus?: string | undefined
46+
}) {
47+
const { durableStatus, durableResult, durableError, toolStateStatus } = input
48+
>>>>>>> 0c80438ed (fix(mothership): async resume and tool result ordering (#3735))
3449

3550
function sleep(ms: number): Promise<void> {
3651
return new Promise((resolve) => setTimeout(resolve, ms))
@@ -122,12 +137,27 @@ async function waitForCheckpointReady(
122137
}
123138
}
124139

140+
<<<<<<< HEAD
125141
return {
126142
checkpointId,
127143
ready: false,
128144
error: 'Checkpoint did not become ready in time',
129145
retryable: true,
130146
}
147+
=======
148+
if (toolStateStatus === 'success') return true
149+
if (toolStateStatus === 'error' || toolStateStatus === 'cancelled') return false
150+
151+
return false
152+
}
153+
154+
interface ReadyContinuationTool {
155+
toolCallId: string
156+
toolState?: ToolCallState
157+
durableRow?: Awaited<ReturnType<typeof getAsyncToolCall>>
158+
needsDurableClaim: boolean
159+
alreadyClaimedByWorker: boolean
160+
>>>>>>> 0c80438ed (fix(mothership): async resume and tool result ordering (#3735))
131161
}
132162

133163
export interface OrchestrateStreamOptions extends OrchestratorOptions {
@@ -277,6 +307,7 @@ export async function orchestrateCopilotStream(
277307
if (!continuation) break
278308

279309
let resumeReady = false
310+
<<<<<<< HEAD
280311
const localPendingPromises = continuation.pendingToolCallIds
281312
.map((toolCallId) => context.pendingToolPromises.get(toolCallId))
282313
.filter(
@@ -298,6 +329,227 @@ export async function orchestrateCopilotStream(
298329
await Promise.allSettled(localPendingPromises)
299330
logger.warn(ASYNC_RESUME_DIAG_TAG, {
300331
phase: 'local_async_tools_settled',
332+
=======
333+
let resumeRetries = 0
334+
for (;;) {
335+
claimedToolCallIds = []
336+
claimedByWorkerId = null
337+
const resumeWorkerId = continuation.runId || context.runId || context.messageId
338+
const readyTools: ReadyContinuationTool[] = []
339+
const localPendingPromises: Promise<unknown>[] = []
340+
const missingToolCallIds: string[] = []
341+
342+
for (const toolCallId of continuation.pendingToolCallIds) {
343+
const durableRow = await getAsyncToolCall(toolCallId).catch(() => null)
344+
const localPendingPromise = context.pendingToolPromises.get(toolCallId)
345+
const toolState = context.toolCalls.get(toolCallId)
346+
347+
if (localPendingPromise) {
348+
localPendingPromises.push(localPendingPromise)
349+
logger.info('Waiting for local async tool completion before retrying resume claim', {
350+
toolCallId,
351+
runId: continuation.runId,
352+
})
353+
continue
354+
}
355+
356+
if (durableRow && isTerminalAsyncStatus(durableRow.status)) {
357+
if (durableRow.claimedBy && durableRow.claimedBy !== resumeWorkerId) {
358+
missingToolCallIds.push(toolCallId)
359+
logger.warn('Async tool continuation is waiting on a claim held by another worker', {
360+
toolCallId,
361+
runId: continuation.runId,
362+
claimedBy: durableRow.claimedBy,
363+
})
364+
continue
365+
}
366+
readyTools.push({
367+
toolCallId,
368+
toolState,
369+
durableRow,
370+
needsDurableClaim: durableRow.claimedBy !== resumeWorkerId,
371+
alreadyClaimedByWorker: durableRow.claimedBy === resumeWorkerId,
372+
})
373+
continue
374+
}
375+
376+
if (
377+
!durableRow &&
378+
toolState &&
379+
isTerminalToolCallStatus(toolState.status) &&
380+
!isToolAvailableOnSimSide(toolState.name)
381+
) {
382+
logger.info('Including Go-handled tool in resume payload (no Sim-side row)', {
383+
toolCallId,
384+
toolName: toolState.name,
385+
status: toolState.status,
386+
runId: continuation.runId,
387+
})
388+
readyTools.push({
389+
toolCallId,
390+
toolState,
391+
needsDurableClaim: false,
392+
alreadyClaimedByWorker: false,
393+
})
394+
continue
395+
}
396+
397+
logger.warn('Skipping already-claimed or missing async tool resume', {
398+
toolCallId,
399+
runId: continuation.runId,
400+
durableStatus: durableRow?.status,
401+
toolStateStatus: toolState?.status,
402+
})
403+
missingToolCallIds.push(toolCallId)
404+
}
405+
406+
if (localPendingPromises.length > 0) {
407+
await Promise.allSettled(localPendingPromises)
408+
continue
409+
}
410+
411+
if (missingToolCallIds.length > 0) {
412+
if (resumeRetries < 3) {
413+
resumeRetries++
414+
logger.info('Retrying async resume after some tool calls were not yet ready', {
415+
checkpointId: continuation.checkpointId,
416+
runId: continuation.runId,
417+
retry: resumeRetries,
418+
missingToolCallIds,
419+
})
420+
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
421+
continue
422+
}
423+
throw new Error(
424+
`Failed to resume async tool continuation: pending tool calls were not ready (${missingToolCallIds.join(', ')})`
425+
)
426+
}
427+
428+
if (readyTools.length === 0) {
429+
if (resumeRetries < 3 && continuation.pendingToolCallIds.length > 0) {
430+
resumeRetries++
431+
logger.info('Retrying async resume because no tool calls were ready yet', {
432+
checkpointId: continuation.checkpointId,
433+
runId: continuation.runId,
434+
retry: resumeRetries,
435+
})
436+
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
437+
continue
438+
}
439+
throw new Error('Failed to resume async tool continuation: no tool calls were ready')
440+
}
441+
442+
const claimCandidates = readyTools.filter((tool) => tool.needsDurableClaim)
443+
const newlyClaimedToolCallIds: string[] = []
444+
const claimFailures: string[] = []
445+
446+
for (const tool of claimCandidates) {
447+
const claimed = await claimCompletedAsyncToolCall(tool.toolCallId, resumeWorkerId).catch(
448+
() => null
449+
)
450+
if (!claimed) {
451+
claimFailures.push(tool.toolCallId)
452+
continue
453+
}
454+
newlyClaimedToolCallIds.push(tool.toolCallId)
455+
}
456+
457+
if (claimFailures.length > 0) {
458+
if (newlyClaimedToolCallIds.length > 0) {
459+
logger.info('Releasing async tool claims after claim contention during resume', {
460+
checkpointId: continuation.checkpointId,
461+
runId: continuation.runId,
462+
newlyClaimedToolCallIds,
463+
claimFailures,
464+
})
465+
await Promise.all(
466+
newlyClaimedToolCallIds.map((toolCallId) =>
467+
releaseCompletedAsyncToolClaim(toolCallId, resumeWorkerId).catch(() => null)
468+
)
469+
)
470+
}
471+
if (resumeRetries < 3) {
472+
resumeRetries++
473+
logger.info('Retrying async resume after claim contention', {
474+
checkpointId: continuation.checkpointId,
475+
runId: continuation.runId,
476+
retry: resumeRetries,
477+
claimFailures,
478+
})
479+
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
480+
continue
481+
}
482+
throw new Error(
483+
`Failed to resume async tool continuation: unable to claim tool calls (${claimFailures.join(', ')})`
484+
)
485+
}
486+
487+
claimedToolCallIds = [
488+
...readyTools
489+
.filter((tool) => tool.alreadyClaimedByWorker)
490+
.map((tool) => tool.toolCallId),
491+
...newlyClaimedToolCallIds,
492+
]
493+
claimedByWorkerId = claimedToolCallIds.length > 0 ? resumeWorkerId : null
494+
495+
logger.info('Resuming async tool continuation', {
496+
checkpointId: continuation.checkpointId,
497+
runId: continuation.runId,
498+
toolCallIds: readyTools.map((tool) => tool.toolCallId),
499+
})
500+
501+
const durableRows = await getAsyncToolCalls(
502+
readyTools.map((tool) => tool.toolCallId)
503+
).catch(() => [])
504+
const durableByToolCallId = new Map(durableRows.map((row) => [row.toolCallId, row]))
505+
506+
const results = await Promise.all(
507+
readyTools.map(async (tool) => {
508+
const durable = durableByToolCallId.get(tool.toolCallId) || tool.durableRow
509+
const durableStatus = durable?.status
510+
const durableResult =
511+
durable?.result && typeof durable.result === 'object'
512+
? (durable.result as Record<string, unknown>)
513+
: undefined
514+
const success = didAsyncToolSucceed({
515+
durableStatus,
516+
durableResult,
517+
durableError: durable?.error,
518+
toolStateStatus: tool.toolState?.status,
519+
})
520+
const data =
521+
durableResult ||
522+
(tool.toolState?.result?.output as Record<string, unknown> | undefined) ||
523+
(success
524+
? { message: 'Tool completed' }
525+
: {
526+
error: durable?.error || tool.toolState?.error || 'Tool failed',
527+
})
528+
529+
if (
530+
durableStatus &&
531+
!isTerminalAsyncStatus(durableStatus) &&
532+
!isDeliveredAsyncStatus(durableStatus)
533+
) {
534+
logger.warn('Async tool row was claimed for resume without terminal durable state', {
535+
toolCallId: tool.toolCallId,
536+
status: durableStatus,
537+
})
538+
}
539+
540+
return {
541+
callId: tool.toolCallId,
542+
name: durable?.toolName || tool.toolState?.name || '',
543+
data,
544+
success,
545+
}
546+
})
547+
)
548+
549+
context.awaitingAsyncContinuation = undefined
550+
route = '/api/tools/resume'
551+
payload = {
552+
>>>>>>> 0c80438ed (fix(mothership): async resume and tool result ordering (#3735))
301553
checkpointId: continuation.checkpointId,
302554
runId: continuation.runId,
303555
pendingToolCallIds: continuation.pendingToolCallIds,

apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,4 +242,76 @@ describe('sse-handlers tool lifecycle', () => {
242242
expect(markToolComplete).toHaveBeenCalledTimes(1)
243243
expect(context.toolCalls.get('tool-upsert-fail')?.status).toBe('success')
244244
})
245+
246+
it('does not execute a tool if a terminal tool_result arrives before local execution starts', async () => {
247+
let resolveUpsert: ((value: null) => void) | undefined
248+
upsertAsyncToolCall.mockImplementationOnce(
249+
() =>
250+
new Promise((resolve) => {
251+
resolveUpsert = resolve
252+
})
253+
)
254+
const onEvent = vi.fn()
255+
256+
await sseHandlers.tool_call(
257+
{
258+
type: 'tool_call',
259+
data: { id: 'tool-race', name: 'read', arguments: { workflowId: 'workflow-1' } },
260+
} as any,
261+
context,
262+
execContext,
263+
{ onEvent, interactive: false, timeout: 1000 }
264+
)
265+
266+
await sseHandlers.tool_result(
267+
{
268+
type: 'tool_result',
269+
toolCallId: 'tool-race',
270+
data: { id: 'tool-race', success: true, result: { ok: true } },
271+
} as any,
272+
context,
273+
execContext,
274+
{ onEvent, interactive: false, timeout: 1000 }
275+
)
276+
277+
resolveUpsert?.(null)
278+
await new Promise((resolve) => setTimeout(resolve, 0))
279+
280+
expect(executeToolServerSide).not.toHaveBeenCalled()
281+
expect(markToolComplete).not.toHaveBeenCalled()
282+
expect(context.toolCalls.get('tool-race')?.status).toBe('success')
283+
expect(context.toolCalls.get('tool-race')?.result?.output).toEqual({ ok: true })
284+
})
285+
286+
it('does not execute a tool if a tool_result arrives before the tool_call event', async () => {
287+
const onEvent = vi.fn()
288+
289+
await sseHandlers.tool_result(
290+
{
291+
type: 'tool_result',
292+
toolCallId: 'tool-early-result',
293+
toolName: 'read',
294+
data: { id: 'tool-early-result', name: 'read', success: true, result: { ok: true } },
295+
} as any,
296+
context,
297+
execContext,
298+
{ onEvent, interactive: false, timeout: 1000 }
299+
)
300+
301+
await sseHandlers.tool_call(
302+
{
303+
type: 'tool_call',
304+
data: { id: 'tool-early-result', name: 'read', arguments: { workflowId: 'workflow-1' } },
305+
} as any,
306+
context,
307+
execContext,
308+
{ onEvent, interactive: false, timeout: 1000 }
309+
)
310+
311+
await new Promise((resolve) => setTimeout(resolve, 0))
312+
313+
expect(executeToolServerSide).not.toHaveBeenCalled()
314+
expect(markToolComplete).not.toHaveBeenCalled()
315+
expect(context.toolCalls.get('tool-early-result')?.status).toBe('success')
316+
})
245317
})

0 commit comments

Comments
 (0)