Skip to content

Commit 92f26f9

Browse files
committed
address comments
1 parent 2b6c812 commit 92f26f9

16 files changed

Lines changed: 474 additions & 149 deletions

File tree

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/components/output-panel/components/structured-output.tsx

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
import { List, type RowComponentProps, useListRef } from 'react-window'
1515
import { Badge, ChevronDown } from '@/components/emcn'
1616
import { cn } from '@/lib/core/utils/cn'
17+
import { isUserFileDisplayMetadata } from '@/lib/core/utils/user-file'
1718
import { isLargeValueRef, type LargeValueRef } from '@/lib/execution/payloads/large-value-ref'
1819

1920
type ValueType = 'null' | 'undefined' | 'array' | 'string' | 'number' | 'boolean' | 'object'
@@ -89,20 +90,6 @@ function getDisplayValue(value: unknown): unknown {
8990
return isLargeValueRef(value) ? getLargeValueDisplayValue(value) : value
9091
}
9192

92-
function isDisplayedUserFileMetadata(value: unknown): value is Record<string, unknown> {
93-
if (!value || typeof value !== 'object' || Array.isArray(value)) return false
94-
const candidate = value as Record<string, unknown>
95-
const url = typeof candidate.url === 'string' ? candidate.url : ''
96-
return (
97-
typeof candidate.id === 'string' &&
98-
typeof candidate.name === 'string' &&
99-
url.length > 0 &&
100-
typeof candidate.size === 'number' &&
101-
typeof candidate.type === 'string' &&
102-
(candidate.id.startsWith('file_') || url.includes('/api/files/serve/'))
103-
)
104-
}
105-
10693
function getTypeLabel(value: unknown): ValueType {
10794
if (value === null) return 'null'
10895
if (value === undefined) return 'undefined'
@@ -151,7 +138,7 @@ function buildEntries(value: unknown, basePath: string): NodeEntry[] {
151138
value: v,
152139
path: `${basePath}.${k}`,
153140
}))
154-
if (isDisplayedUserFileMetadata(displayValue) && !('base64' in displayValue)) {
141+
if (isUserFileDisplayMetadata(displayValue) && !('base64' in displayValue)) {
155142
entries.push({
156143
key: 'base64',
157144
value: USER_FILE_BASE64_PLACEHOLDER,

apps/sim/executor/orchestrators/parallel.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { DAG } from '@/executor/dag/builder'
66
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
77
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
88
import type { ExecutionContext } from '@/executor/types'
9+
import { buildBranchNodeId } from '@/executor/utils/subflow-utils'
910

1011
function createDag(): DAG {
1112
return {
@@ -173,4 +174,74 @@ describe('ParallelOrchestrator', () => {
173174
expect(scope?.branchOutputs.get(20)).toEqual([{ output: 'resumed' }])
174175
expect(scope?.branchOutputs.has(0)).toBe(false)
175176
})
177+
178+
it('resets only incoming batch branch state when scheduling later batches', async () => {
179+
const dag = createDag()
180+
const incomingBranchId = buildBranchNodeId('task-1', 0)
181+
const previousBranchId = buildBranchNodeId('task-1', 1)
182+
dag.nodes.set(incomingBranchId, {
183+
id: incomingBranchId,
184+
block: {
185+
id: 'task-1',
186+
position: { x: 0, y: 0 },
187+
config: { tool: '', params: {} },
188+
inputs: {},
189+
outputs: {},
190+
metadata: { id: 'function', name: 'Task 1' },
191+
enabled: true,
192+
},
193+
incomingEdges: new Set(),
194+
outgoingEdges: new Set(),
195+
metadata: { parallelId: 'parallel-1', isParallelBranch: true, branchIndex: 0 },
196+
})
197+
dag.nodes.set(previousBranchId, {
198+
id: previousBranchId,
199+
block: {
200+
id: 'task-1',
201+
position: { x: 0, y: 0 },
202+
config: { tool: '', params: {} },
203+
inputs: {},
204+
outputs: {},
205+
metadata: { id: 'function', name: 'Task 1' },
206+
enabled: true,
207+
},
208+
incomingEdges: new Set(),
209+
outgoingEdges: new Set(),
210+
metadata: { parallelId: 'parallel-1', isParallelBranch: true, branchIndex: 1 },
211+
})
212+
const state = createState()
213+
const orchestrator = new ParallelOrchestrator(dag, state, null, {})
214+
215+
await (
216+
orchestrator as unknown as {
217+
scheduleNextBatch(
218+
ctx: ExecutionContext,
219+
scope: NonNullable<ExecutionContext['parallelExecutions']> extends Map<
220+
string,
221+
infer Scope
222+
>
223+
? Scope
224+
: never,
225+
nextBatchStart: number
226+
): Promise<void>
227+
}
228+
).scheduleNextBatch(
229+
createContext(),
230+
{
231+
parallelId: 'parallel-1',
232+
totalBranches: 3,
233+
batchSize: 1,
234+
currentBatchStart: 0,
235+
currentBatchSize: 2,
236+
accumulatedOutputs: new Map([[1, [{ output: 'previous' }]]]),
237+
branchOutputs: new Map(),
238+
},
239+
2
240+
)
241+
242+
expect(state.deleteBlockState).toHaveBeenCalledWith(incomingBranchId)
243+
expect(state.deleteBlockState).not.toHaveBeenCalledWith(previousBranchId)
244+
expect(state.unmarkExecuted).toHaveBeenCalledWith(incomingBranchId)
245+
expect(state.unmarkExecuted).not.toHaveBeenCalledWith(previousBranchId)
246+
})
176247
})

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ export class ParallelOrchestrator {
390390

391391
this.registerClonedSubflows(ctx, scope.parallelId, clonedSubflows)
392392
this.registerBranchMappings(ctx, scope.parallelId, allBranchNodes)
393-
this.resetBatchExecutionState(scope.parallelId)
393+
this.resetBatchExecutionState(allBranchNodes)
394394

395395
scope.currentBatchStart = nextBatchStart
396396
scope.currentBatchSize = currentBatchSize
@@ -408,9 +408,10 @@ export class ParallelOrchestrator {
408408
})
409409
}
410410

411-
private resetBatchExecutionState(parallelId: string): void {
412-
for (const [nodeId, node] of this.dag.nodes.entries()) {
413-
if (node.metadata.parallelId !== parallelId || !node.metadata.isParallelBranch) {
411+
private resetBatchExecutionState(branchNodeIds: string[]): void {
412+
for (const nodeId of branchNodeIds) {
413+
const node = this.dag.nodes.get(nodeId)
414+
if (!node?.metadata.isParallelBranch) {
414415
continue
415416
}
416417
this.state.unmarkExecuted(nodeId)

apps/sim/executor/utils/parallel-expansion.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ describe('Nested parallel expansion + edge resolution', () => {
310310
// P3 should also be cloned (inside P2__obranch-1) with a __clone prefix
311311
const p3Clone = p1Result.clonedSubflows.find((c) => c.originalId === p3)!
312312
expect(p3Clone).toBeDefined()
313-
expect(p3Clone.clonedId).toMatch(/^p3__clone\d+__obranch-1$/)
313+
expect(p3Clone.clonedId).toMatch(/^p3__clone[0-9a-f]{24}__obranch-1$/)
314314
expect(stripCloneSuffixes(p3Clone.clonedId)).toBe('p3')
315315

316316
// Step 2: Expand P2 (original, branch 0 of P1) — this creates P3__obranch-1 at runtime

apps/sim/executor/utils/parallel-expansion.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { createLogger } from '@sim/logger'
2+
import { sha256Hex } from '@sim/security/hash'
23
import { EDGE } from '@/executor/constants'
34
import type { DAG, DAGNode } from '@/executor/dag/builder'
45
import type { SerializedBlock } from '@/serializer/types'
@@ -288,7 +289,7 @@ export class ParallelExpander {
288289
/**
289290
* Generates a unique clone ID for pre-expansion cloning.
290291
*
291-
* Pre-expansion clones use `{originalId}__clone{hash}__obranch-{branchIndex}` instead
292+
* Pre-expansion clones use `{originalId}__clone{digest}__obranch-{branchIndex}` instead
292293
* of the plain `{originalId}__obranch-{branchIndex}` used by runtime expansion.
293294
* The clone segment prevents naming collisions when the original (branch-0)
294295
* subflow later expands at runtime and creates `{child}__obranch-{branchIndex}`.
@@ -299,12 +300,9 @@ export class ParallelExpander {
299300
outerBranchIndex: number,
300301
parentCloneId: string
301302
): string {
302-
let hash = 0
303303
const input = `${parentCloneId}:${originalId}:${outerBranchIndex}`
304-
for (let i = 0; i < input.length; i++) {
305-
hash = (hash * 31 + input.charCodeAt(i)) >>> 0
306-
}
307-
return `${originalId}__clone${hash}__obranch-${outerBranchIndex}`
304+
const digest = sha256Hex(input).slice(0, 24)
305+
return `${originalId}__clone${digest}__obranch-${outerBranchIndex}`
308306
}
309307

310308
/**

apps/sim/executor/utils/subflow-utils.test.ts

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,84 +4,86 @@
44
import { describe, expect, it, vi } from 'vitest'
55
import type { ExecutionContext } from '@/executor/types'
66
import type { VariableResolver } from '@/executor/variables/resolver'
7-
import { findEffectiveContainerId, resolveArrayInput } from './subflow-utils'
7+
import { findEffectiveContainerId, resolveArrayInputAsync } from './subflow-utils'
88

9-
describe('resolveArrayInput', () => {
9+
describe('resolveArrayInputAsync', () => {
1010
const fakeCtx = {} as unknown as ExecutionContext
1111

12-
it('returns arrays as-is', () => {
13-
expect(resolveArrayInput(fakeCtx, [1, 2, 3], null)).toEqual([1, 2, 3])
12+
it('returns arrays as-is', async () => {
13+
await expect(resolveArrayInputAsync(fakeCtx, [1, 2, 3], null)).resolves.toEqual([1, 2, 3])
1414
})
1515

16-
it('converts plain objects to entries', () => {
17-
expect(resolveArrayInput(fakeCtx, { a: 1, b: 2 }, null)).toEqual([
16+
it('converts plain objects to entries', async () => {
17+
await expect(resolveArrayInputAsync(fakeCtx, { a: 1, b: 2 }, null)).resolves.toEqual([
1818
['a', 1],
1919
['b', 2],
2020
])
2121
})
2222

23-
it('returns empty array when a pure reference resolves to null (skipped block)', () => {
23+
it('returns empty array when a pure reference resolves to null (skipped block)', async () => {
2424
// `resolveSingleReference` returns `null` for a reference that points at a
2525
// block that exists in the workflow but did not execute on this path.
2626
// A loop/parallel over such a reference should run zero iterations rather
2727
// than fail the workflow.
2828
const resolver = {
29-
resolveSingleReference: vi.fn().mockReturnValue(null),
29+
resolveSingleReference: vi.fn().mockResolvedValue(null),
3030
} as unknown as VariableResolver
3131

32-
const result = resolveArrayInput(fakeCtx, '<SkippedBlock.result.items>', resolver)
32+
const result = await resolveArrayInputAsync(fakeCtx, '<SkippedBlock.result.items>', resolver)
3333

3434
expect(result).toEqual([])
3535
expect(resolver.resolveSingleReference).toHaveBeenCalled()
3636
})
3737

38-
it('returns the array from a pure reference that resolved to an array', () => {
38+
it('returns the array from a pure reference that resolved to an array', async () => {
3939
const resolver = {
40-
resolveSingleReference: vi.fn().mockReturnValue([1, 2, 3]),
40+
resolveSingleReference: vi.fn().mockResolvedValue([1, 2, 3]),
4141
} as unknown as VariableResolver
4242

43-
expect(resolveArrayInput(fakeCtx, '<Block.items>', resolver)).toEqual([1, 2, 3])
43+
await expect(resolveArrayInputAsync(fakeCtx, '<Block.items>', resolver)).resolves.toEqual([
44+
1, 2, 3,
45+
])
4446
})
4547

46-
it('converts resolved objects to entries', () => {
48+
it('converts resolved objects to entries', async () => {
4749
const resolver = {
48-
resolveSingleReference: vi.fn().mockReturnValue({ x: 1, y: 2 }),
50+
resolveSingleReference: vi.fn().mockResolvedValue({ x: 1, y: 2 }),
4951
} as unknown as VariableResolver
5052

51-
expect(resolveArrayInput(fakeCtx, '<Block.obj>', resolver)).toEqual([
53+
await expect(resolveArrayInputAsync(fakeCtx, '<Block.obj>', resolver)).resolves.toEqual([
5254
['x', 1],
5355
['y', 2],
5456
])
5557
})
5658

57-
it('throws when a pure reference resolves to a non-array, non-object, non-null value', () => {
59+
it('throws when a pure reference resolves to a non-array, non-object, non-null value', async () => {
5860
const resolver = {
59-
resolveSingleReference: vi.fn().mockReturnValue(42),
61+
resolveSingleReference: vi.fn().mockResolvedValue(42),
6062
} as unknown as VariableResolver
6163

62-
expect(() => resolveArrayInput(fakeCtx, '<Block.count>', resolver)).toThrow(
64+
await expect(resolveArrayInputAsync(fakeCtx, '<Block.count>', resolver)).rejects.toThrow(
6365
/did not resolve to an array or object/
6466
)
6567
})
6668

67-
it('throws when a pure reference resolves to undefined (unknown block)', () => {
69+
it('throws when a pure reference resolves to undefined (unknown block)', async () => {
6870
// `undefined` means the reference could not be matched to any block at
6971
// all (typo / deleted block). This must still fail loudly.
7072
const resolver = {
71-
resolveSingleReference: vi.fn().mockReturnValue(undefined),
73+
resolveSingleReference: vi.fn().mockResolvedValue(undefined),
7274
} as unknown as VariableResolver
7375

74-
expect(() => resolveArrayInput(fakeCtx, '<Missing.items>', resolver)).toThrow(
76+
await expect(resolveArrayInputAsync(fakeCtx, '<Missing.items>', resolver)).rejects.toThrow(
7577
/did not resolve to an array or object/
7678
)
7779
})
7880

79-
it('parses a JSON array string', () => {
80-
expect(resolveArrayInput(fakeCtx, '[1, 2, 3]', null)).toEqual([1, 2, 3])
81+
it('parses a JSON array string', async () => {
82+
await expect(resolveArrayInputAsync(fakeCtx, '[1, 2, 3]', null)).resolves.toEqual([1, 2, 3])
8183
})
8284

83-
it('throws on a string that is neither a reference nor valid JSON array/object', () => {
84-
expect(() => resolveArrayInput(fakeCtx, 'not json', null)).toThrow()
85+
it('throws on a string that is neither a reference nor valid JSON array/object', async () => {
86+
await expect(resolveArrayInputAsync(fakeCtx, 'not json', null)).rejects.toThrow()
8587
})
8688
})
8789

0 commit comments

Comments
 (0)