Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,19 @@ jobs:
DEPLOYMENT_URL: "http://localhost:${{ matrix.app.name == 'sveltekit' && '4173' || (matrix.app.name == 'astro' && '4321' || '3000') }}"
NEXT_CANARY: ${{ matrix.app.canary && '1' || '' }}

- name: Run Low-Concurrency Worker-Slot Test
if: ${{ !matrix.app.canary && matrix.app.name == 'nextjs-turbopack' }}
run: |
cd "${{ steps.prepare-workbench.outputs.workbench_app_path }}" && PORT=3001 WORKFLOW_POSTGRES_WORKER_CONCURRENCY=1 pnpm start &
echo "starting low-concurrency tests in 10 seconds" && sleep 10
pnpm vitest run packages/core/e2e/e2e.test.ts -t "frees worker slots for unrelated workflows while a waiter is blocked"
env:
NODE_OPTIONS: "--enable-source-maps"
APP_NAME: ${{ matrix.app.name }}
WORKBENCH_APP_PATH: ${{ steps.prepare-workbench.outputs.workbench_app_path }}
DEPLOYMENT_URL: "http://localhost:3001"
WORKFLOW_LIMITS_LOW_CONCURRENCY: "1"

- name: Generate E2E summary
if: always()
run: node .github/scripts/aggregate-e2e-results.js . --job-name "E2E Local Postgres (${{ matrix.app.name }})" >> $GITHUB_STEP_SUMMARY || true
Expand Down
195 changes: 188 additions & 7 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ import {
expect,
test,
} from 'vitest';
import type { Run } from '../src/runtime';
import { createLimitsRuntimeSuite } from '../../world-testing/src/limits-runtime.js';
import type { Run, StartOptions } from '../src/runtime.js';
import {
cancelRun,
getHookByToken,
getRun,
getWorld,
healthCheck,
start as rawStart,
resumeHook,
} from '../src/runtime';
} from '../src/runtime.js';
import {
cliCancel,
cliHealthJson,
Expand All @@ -49,10 +51,25 @@ if (!deploymentUrl) {
* Tracked wrapper around start() that automatically registers runs
* for diagnostics on test failure and observability metadata collection.
*/
async function start<T>(
...args: Parameters<typeof rawStart<T>>
): Promise<Run<T>> {
const run = await rawStart<T>(...args);
type E2EWorkflowMetadata = Awaited<ReturnType<typeof getWorkflowMetadata>>;

async function start<TResult = any>(
workflow: E2EWorkflowMetadata,
options?: StartOptions
): Promise<Run<TResult>>;
async function start<TArgs extends unknown[], TResult = any>(
workflow: E2EWorkflowMetadata,
args: TArgs,
options?: StartOptions
): Promise<Run<TResult>>;
async function start<TResult = any>(
workflow: E2EWorkflowMetadata,
argsOrOptions?: unknown[] | StartOptions,
options?: StartOptions
): Promise<Run<TResult>> {
const run = Array.isArray(argsOrOptions)
? await rawStart<unknown[], TResult>(workflow, argsOrOptions, options)
: await rawStart<TResult>(workflow, argsOrOptions);
trackRun(run);
return run;
}
Expand Down Expand Up @@ -220,11 +237,175 @@ describe('e2e', () => {

const isNext = process.env.APP_NAME?.includes('nextjs');
const isLocal = deploymentUrl.includes('localhost');
const isPostgresWorld =
process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres';
const isLocalWorld = isLocalDeployment() && !isPostgresWorld;
// only works with framework that transpiles react and
// doesn't work on Vercel due to eval hack so react isn't
// bundled in function
const shouldSkipReactRenderTest = !(isNext && isLocal);

if (isLocalWorld || isPostgresWorld) {
createLimitsRuntimeSuite(
`limits runtime (${isPostgresWorld ? 'postgres' : 'local'})`,
async () => ({
async runWorkflowWithScopedLocks(userId) {
const run = await start(await e2e('workflowWithScopedLocks'), [
userId,
]);
return await run.returnValue;
},
async runWorkflowLockContention(userId, holdMs) {
const workflow = await e2e('workflowLockContentionWorkflow');
const runA = await start(workflow, [userId, holdMs]);
await sleep(100);
const runB = await start(workflow, [userId, holdMs]);
return await Promise.all([runA.returnValue, runB.returnValue]);
},
async runLockedStepCallContention(
key,
holdMs,
labelA = 'A',
labelB = 'B'
) {
const workflow = await e2e('lockedStepCallContentionWorkflow');
const runA = await start(workflow, [key, holdMs, labelA]);
await sleep(100);
const runB = await start(workflow, [key, holdMs, labelB]);
return await Promise.all([runA.returnValue, runB.returnValue]);
},
async runWorkflowLockAcrossSuspension(userId, holdMs) {
const workflow = await e2e('workflowOnlyLockContentionWorkflow');
const runA = await start(workflow, [userId, holdMs, 'A']);
await sleep(100);
const runB = await start(workflow, [userId, holdMs, 'B']);
return await Promise.all([runA.returnValue, runB.returnValue]);
},
async runWorkflowExpiredLeaseRecovery(userId, leaseTtlMs) {
const leakedWorkflow = await e2e('workflowLeakedLockWorkflow');
const waiterWorkflow = await e2e(
'workflowOnlyLockContentionWorkflow'
);
const leakedRun = await start(leakedWorkflow, [
userId,
leaseTtlMs,
'A',
]);
const leakedResult = await leakedRun.returnValue;
const waiterRun = await start(waiterWorkflow, [userId, 0, 'B']);
const waiterResult = await waiterRun.returnValue;
return [leakedResult, waiterResult];
},
async runLeakedKeyExpiredLeaseRecovery(userId, leaseTtlMs) {
const leakedWorkflow = await e2e('leakedKeyLockWorkflow');
const waiterWorkflow = await e2e('lockedStepCallContentionWorkflow');
const leakedRun = await start(leakedWorkflow, [
userId,
leaseTtlMs,
'A',
]);
const leakedResult = await leakedRun.returnValue;
const waiterRun = await start(waiterWorkflow, [
leakedResult.key,
0,
'B',
]);
const waiterResult = await waiterRun.returnValue;
return [leakedResult, waiterResult];
},
async runWorkflowMixedLimitContention(userId, holdMs, periodMs) {
const workflow = await e2e('workflowMixedLimitContentionWorkflow');
const runA = await start(workflow, [userId, holdMs, periodMs, 'A']);
await sleep(100);
const runB = await start(workflow, [userId, holdMs, periodMs, 'B']);
return await Promise.all([runA.returnValue, runB.returnValue]);
},
async runWorkflowFifoThreeWaiters(userId, holdMs) {
const workflow = await e2e('workflowOnlyLockContentionWorkflow');
const runA = await start(workflow, [userId, holdMs, 'A']);
await sleep(100);
const runB = await start(workflow, [userId, holdMs, 'B']);
await sleep(100);
const runC = await start(workflow, [userId, holdMs, 'C']);
return await Promise.all([
runA.returnValue,
runB.returnValue,
runC.returnValue,
]);
},
async runCancelledWorkflowWaiter(userId, holdMs) {
const workflow = await e2e('workflowOnlyLockContentionWorkflow');
const runA = await start(workflow, [userId, holdMs, 'A']);
await sleep(100);
const runB = await start(workflow, [userId, holdMs, 'B']);
await sleep(100);
await cancelRun(getWorld(), runB.runId);
const cancelledError = await runB.returnValue.catch((error) => error);
const runC = await start(workflow, [userId, holdMs, 'C']);
const [resultA, resultC] = await Promise.all([
runA.returnValue,
runC.returnValue,
]);
return { cancelledError, resultA, resultC };
},
async runIndependentWorkflowKeys(holdMs) {
const workflow = await e2e('workflowOnlyLockContentionWorkflow');
const runA = await start(workflow, ['user-a', holdMs]);
await sleep(100);
const runB = await start(workflow, ['user-b', holdMs]);
return await Promise.all([runA.returnValue, runB.returnValue]);
},
async runIndependentStepKeys(holdMs) {
const workflow = await e2e('lockedStepCallContentionWorkflow');
const runA = await start(workflow, [
'step:db:isolation:a',
holdMs,
'A',
]);
await sleep(100);
const runB = await start(workflow, [
'step:db:isolation:b',
holdMs,
'B',
]);
return await Promise.all([runA.returnValue, runB.returnValue]);
},
async runBlockedWaiterWithUnrelatedWorkflow(holdMs) {
const workflow = await e2e('workflowOnlyLockContentionWorkflow');
const runA = await start(workflow, [
'worker-slot-shared',
holdMs,
'A',
]);
await sleep(100);
const runB = await start(workflow, [
'worker-slot-shared',
holdMs,
'B',
]);
await sleep(100);
const runC = await start(workflow, [
'worker-slot-unrelated',
Math.max(100, Math.floor(holdMs / 4)),
'C',
]);

const [holder, waiter, unrelated] = await Promise.all([
runA.returnValue,
runB.returnValue,
runC.returnValue,
]);
return { holder, waiter, unrelated };
},
async runWorkflowSingleLockAcrossMultipleSteps(holdMs) {
const workflow = await e2e('singleLockAcrossMultipleStepsWorkflow');
const run = await start(workflow, ['step:db:batch', holdMs]);
return await run.returnValue;
},
})
);
}

test.skipIf(shouldSkipReactRenderTest)(
'should work with react rendering in step',
async () => {
Expand Down Expand Up @@ -1776,7 +1957,7 @@ describe('e2e', () => {
// Cancel the run using the core runtime cancelRun function.
// This exercises the same cancelRun code path that the CLI uses
// (the CLI delegates directly to this function).
const { cancelRun } = await import('../src/runtime');
const { cancelRun } = await import('../src/runtime.js');
await cancelRun(getWorld(), run.runId);

// Verify the run was cancelled - returnValue should throw WorkflowRunCancelledError
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/async-deserialization-ordering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
onUnconsumedEvent: () => {},
getPromiseQueue: () => Promise.resolve(),
}),
nextLockIndex: 0,
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt),
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
Expand Down
13 changes: 11 additions & 2 deletions packages/core/src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@ export interface WaitInvocationQueueItem {
hasCreatedEvent?: boolean;
}

export interface LimitWaitInvocationQueueItem {
type: 'limit_wait';
correlationId: string;
resumeAt: Date;
}

export type QueueItem =
| StepInvocationQueueItem
| HookInvocationQueueItem
| WaitInvocationQueueItem;
| WaitInvocationQueueItem
| LimitWaitInvocationQueueItem;

/**
* An error that is thrown when one or more operations (steps/hooks/etc.) are called but do
Expand Down Expand Up @@ -61,7 +68,9 @@ export class WorkflowSuspension extends Error {
else if (item.type === 'hook') {
if (item.disposed) hookDisposedCount++;
else hookCount++;
} else if (item.type === 'wait') waitCount++;
} else if (item.type === 'wait' || item.type === 'limit_wait') {
waitCount++;
}
}

// Build description parts
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/hook-sleep-interaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
onUnconsumedEvent: () => {},
getPromiseQueue: () => promiseQueueHolder.current,
}),
nextLockIndex: 0,
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt),
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ export {
type WebhookOptions,
} from './create-hook.js';
export { defineHook, type TypedHook } from './define-hook.js';
export {
lock,
type LockHandle,
type LockOptions,
LIMITS_NOT_IMPLEMENTED_MESSAGE,
} from './lock.js';
export { sleep } from './sleep.js';
export {
getStepMetadata,
Expand Down
64 changes: 64 additions & 0 deletions packages/core/src/lock.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
lock,
LIMITS_NOT_IMPLEMENTED_MESSAGE,
LOCK_WORKFLOW_ONLY_MESSAGE,
} from './lock.js';
import { contextStorage } from './step/context-storage.js';
import { WORKFLOW_LOCK } from './symbols.js';

afterEach(() => {
delete (globalThis as any)[WORKFLOW_LOCK];
});

describe('lock', () => {
it('throws when called outside workflow or step execution context', async () => {
await expect(
lock({
key: 'workflow:user:test',
concurrency: { max: 1 },
})
).rejects.toThrow(LIMITS_NOT_IMPLEMENTED_MESSAGE);
});

it('prefers the workflow runtime lock when both runtimes are present', async () => {
const workflowHandle = { leaseId: 'lease_workflow' };
const workflowLock = vi.fn().mockResolvedValue(workflowHandle);
(globalThis as any)[WORKFLOW_LOCK] = workflowLock;
const options = {
key: 'workflow:user:test',
concurrency: { max: 1 },
};

await expect(lock(options)).resolves.toBe(workflowHandle);
expect(workflowLock).toHaveBeenCalledWith(options);
});

it('throws a workflow-only error when called inside a step context', async () => {
const options = {
key: 'step:db:cheap',
concurrency: { max: 2 },
};

await expect(
contextStorage.run(
{
stepMetadata: {
stepId: 'step_test',
stepName: 'testStep',
stepStartedAt: new Date(),
attempt: 1,
},
workflowMetadata: {
workflowName: 'testWorkflow',
workflowRunId: 'wrun_test',
workflowStartedAt: new Date(),
url: 'http://localhost:3000',
},
ops: [],
},
() => lock(options)
)
).rejects.toThrow(LOCK_WORKFLOW_ONLY_MESSAGE);
});
});
Loading