Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/fast-owls-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@workflow/core": patch
---

perf: parallelize suspension handler and refactor runtime

- Process hooks first, then steps and waits in parallel to prevent race conditions
- Refactor runtime.ts into modular files: `suspension-handler.ts`, `step-handler.ts`, `helpers.ts`
- Add otel attributes for hooks created (`workflow.hooks.created`) and waits created (`workflow.waits.created`)
- Update suspension status from `pending_steps` to `workflow_suspended`
144 changes: 50 additions & 94 deletions packages/core/e2e/bench.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ async function triggerWorkflow(
async function getWorkflowReturnValue(
runId: string
): Promise<{ run: any; value: any }> {
const MAX_UNEXPECTED_CONTENT_RETRIES = 3;
let unexpectedContentRetries = 0;

// We need to poll the GET endpoint until the workflow run is completed.
while (true) {
const url = new URL('/api/trigger', deploymentUrl);
Expand Down Expand Up @@ -105,7 +108,24 @@ async function getWorkflowReturnValue(
return { run, value: res.body };
}

throw new Error(`Unexpected content type: ${contentType}`);
// Unexpected content type - log details and retry
unexpectedContentRetries++;
const responseText = await res.text().catch(() => '<failed to read body>');
console.warn(
`[bench] Unexpected content type for runId=${runId} (attempt ${unexpectedContentRetries}/${MAX_UNEXPECTED_CONTENT_RETRIES}):\n` +
` Status: ${res.status}\n` +
` Content-Type: ${contentType}\n` +
` Response: ${responseText.slice(0, 500)}${responseText.length > 500 ? '...' : ''}`
);

if (unexpectedContentRetries >= MAX_UNEXPECTED_CONTENT_RETRIES) {
throw new Error(
`Unexpected content type after ${MAX_UNEXPECTED_CONTENT_RETRIES} retries: ${contentType} (status=${res.status})`
);
}

// Wait before retrying
await new Promise((resolve) => setTimeout(resolve, 500));
}
}

Expand Down Expand Up @@ -293,16 +313,6 @@ describe('Workflow Performance Benchmarks', () => {
{ time: 5000, iterations: 5, warmupIterations: 1, teardown }
);

bench(
'workflow with 10 parallel steps',
async () => {
const { runId } = await triggerWorkflow('tenParallelStepsWorkflow', []);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('workflow with 10 parallel steps', run);
},
{ time: 5000, iterations: 5, warmupIterations: 1, teardown }
);

bench(
'workflow with stream',
async () => {
Expand Down Expand Up @@ -341,88 +351,34 @@ describe('Workflow Performance Benchmarks', () => {
{ time: 5000, warmupIterations: 1, teardown }
);

// Stress tests for large concurrent step counts
// These reproduce reported issues with Promise.race/Promise.all at scale

bench(
'stress test: Promise.all with 100 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseAllStressTestWorkflow',
[100]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.all with 100 concurrent steps', run);
},
{ time: 30000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.all with 500 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseAllStressTestWorkflow',
[500]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.all with 500 concurrent steps', run);
},
{ time: 60000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.all with 1000 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseAllStressTestWorkflow',
[1000]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.all with 1000 concurrent steps', run);
},
{ time: 120000, iterations: 1, warmupIterations: 0, teardown }
);

bench(
'stress test: Promise.race with 100 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseRaceStressTestLargeWorkflow',
[100]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.race with 100 concurrent steps', run);
},
{ time: 30000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.race with 500 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseRaceStressTestLargeWorkflow',
[500]
// Concurrent step benchmarks for Promise.all/Promise.race at various scales
const concurrentStepCounts = [
{ count: 10, skip: false, time: 30000 },
{ count: 25, skip: false, time: 30000 },
{ count: 100, skip: true, time: 60000 },
{ count: 500, skip: true, time: 120000 },
{ count: 1000, skip: true, time: 180000 },
] as const;

const concurrentStepTypes = [
{ type: 'Promise.all', workflow: 'promiseAllStressTestWorkflow' },
{ type: 'Promise.race', workflow: 'promiseRaceStressTestLargeWorkflow' },
] as const;

for (const { type, workflow } of concurrentStepTypes) {
for (const { count, skip, time } of concurrentStepCounts) {
const name = `${type} with ${count} concurrent steps`;
const benchFn = skip ? bench.skip : bench;

benchFn(
name,
async () => {
const { runId } = await triggerWorkflow(workflow, [count]);
const { run } = await getWorkflowReturnValue(runId);
stageTiming(name, run);
},
{ time, iterations: 1, warmupIterations: 0, teardown }
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.race with 500 concurrent steps', run);
},
{ time: 60000, iterations: 1, warmupIterations: 0, teardown }
);

// TODO: Re-enable after performance optimizations (see beads issue wrk-fyx)
bench.skip(
'stress test: Promise.race with 1000 concurrent steps',
async () => {
const { runId } = await triggerWorkflow(
'promiseRaceStressTestLargeWorkflow',
[1000]
);
const { run } = await getWorkflowReturnValue(runId);
stageTiming('stress test: Promise.race with 1000 concurrent steps', run);
},
{ time: 120000, iterations: 1, warmupIterations: 0, teardown }
);
}
}
});
Loading
Loading