Skip to content

Commit 8ffac8a

Browse files
committed
feat: Enhance workflow concurrency management with validation and atomicity
1 parent 61b9b92 commit 8ffac8a

9 files changed

Lines changed: 621 additions & 182 deletions

File tree

ARCHITECTURE.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,12 @@ attempt is persisted individually as a `step_attempt`.
301301

302302
Workers are configured with a concurrency limit (e.g., 10). A worker will
303303
maintain up to 10 in-flight workflow runs simultaneously. It polls for new work
304-
only when it has available capacity. The Backend's atomic `dequeue` operation
305-
(`FOR UPDATE SKIP LOCKED`) ensures that multiple workers can poll the same table
306-
without race conditions or processing the same run twice.
304+
only when it has available capacity. Claim atomicity is backend-specific:
305+
306+
- Postgres uses `FOR UPDATE SKIP LOCKED` plus advisory locks for constrained
307+
buckets.
308+
- SQLite uses transaction-level single-writer locking (`BEGIN IMMEDIATE`) to
309+
serialize claim writes.
307310

308311
### 5.3. Workflow-Run Concurrency
309312

@@ -327,7 +330,7 @@ defineWorkflow(
327330

328331
`key` and `limit` can each be either static values (`string`/`number`) or
329332
functions of the validated workflow input. They are resolved once when the run
330-
is created and persisted on the `workflow_run`.
333+
is created and persisted on `workflow_runs`.
331334
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
332335

333336
During claim/dequeue, a run is claimable only when the number of active leased
@@ -341,6 +344,9 @@ is:
341344

342345
`pending`, `sleeping`, and expired-lease `running` runs do not consume
343346
concurrency slots.
347+
For active runs in a bucket (`pending`, `running`), the resolved
348+
`concurrency_limit` is required to be consistent; conflicting limits are
349+
rejected at run creation.
344350

345351
### 5.4. Handling Crashes During Parallel Execution
346352

packages/docs/docs/workers.mdx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ Only active leased `running` runs consume workflow-concurrency slots.
119119
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
120120
Sleeping runs are non-consuming until they are claimed again as actively leased
121121
`running` runs.
122+
Within active `pending` and actively leased `running` runs for the same
123+
workflow+version+key bucket, the resolved `limit` must remain consistent.
122124

123125
## Heartbeats and Crash Recovery
124126

@@ -166,6 +168,8 @@ Workers coordinate through the database:
166168
- Each workflow run is claimed by exactly one worker at a time
167169
- Workers use atomic database operations to prevent duplicate processing
168170
- If a worker crashes, its workflows become available to other workers
171+
- SQLite relies on transaction-level single-writer locking (`BEGIN IMMEDIATE`)
172+
while Postgres uses row locks plus advisory locks for constrained buckets
169173

170174
## Graceful Shutdown
171175

packages/docs/docs/workflows.mdx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ defineWorkflow(
213213
- key must resolve to a non-empty string
214214
- limit must resolve to a positive integer
215215
- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected
216+
- within active runs (`pending`/`running`) for the same
217+
workflow+version+key bucket, `limit` must remain consistent
216218

217219
When concurrency is configured, runs in the same bucket are constrained by:
218220

packages/openworkflow/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ Keys are stored verbatim (for example, `" foo "` and `"foo"` are different
9898
concurrency keys); only empty or all-whitespace keys are rejected.
9999
Sleeping runs do not consume workflow-concurrency slots until they are claimed
100100
again as actively leased `running` runs.
101+
For a given active bucket (`workflow + version + key`), the resolved `limit`
102+
must stay consistent across `pending`/`running` runs.
101103

102104
## Documentation
103105

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import type { CreateWorkflowRunParams } from "./backend.js";
2+
3+
const INVALID_CONCURRENCY_KEY_TYPE_ERROR =
4+
'Invalid workflow concurrency metadata: "concurrencyKey" must be a string or null.';
5+
export const INVALID_CONCURRENCY_KEY_VALUE_ERROR =
6+
'Invalid workflow concurrency metadata: "concurrencyKey" must be a non-empty string when provided.';
7+
const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR =
8+
'Invalid workflow concurrency metadata: "concurrencyLimit" must be a number or null.';
9+
export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR =
10+
'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.';
11+
const INVALID_CONCURRENCY_PAIR_ERROR =
12+
'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.';
13+
export const CONCURRENCY_LIMIT_MISMATCH_ERROR =
14+
'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".';
15+
16+
/**
17+
* Bucket identity for workflow-level concurrency.
18+
*/
19+
export interface ConcurrencyBucket {
20+
workflowName: string;
21+
version: string | null;
22+
key: string;
23+
limit: number;
24+
}
25+
26+
/**
27+
* Normalize and validate workflow concurrency metadata passed to create calls.
28+
* This protects direct backend callers that bypass client-side validation.
29+
* @param params - Workflow run creation params
30+
* @returns Params with normalized concurrency fields
31+
* @throws {Error} When concurrency metadata has invalid shape or values
32+
*/
33+
export function normalizeCreateWorkflowRunParams(
34+
params: CreateWorkflowRunParams,
35+
): CreateWorkflowRunParams {
36+
const rawParams = params as unknown as Record<string, unknown>;
37+
const rawConcurrencyKey = rawParams["concurrencyKey"];
38+
const rawConcurrencyLimit = rawParams["concurrencyLimit"];
39+
40+
if (rawConcurrencyKey === undefined && rawConcurrencyLimit === undefined) {
41+
return {
42+
...params,
43+
concurrencyKey: null,
44+
concurrencyLimit: null,
45+
};
46+
}
47+
48+
if (
49+
rawConcurrencyKey !== undefined &&
50+
rawConcurrencyKey !== null &&
51+
typeof rawConcurrencyKey !== "string"
52+
) {
53+
throw new Error(INVALID_CONCURRENCY_KEY_TYPE_ERROR);
54+
}
55+
56+
if (
57+
rawConcurrencyLimit !== undefined &&
58+
rawConcurrencyLimit !== null &&
59+
typeof rawConcurrencyLimit !== "number"
60+
) {
61+
throw new Error(INVALID_CONCURRENCY_LIMIT_TYPE_ERROR);
62+
}
63+
64+
const concurrencyKey =
65+
rawConcurrencyKey === undefined ? null : rawConcurrencyKey;
66+
const concurrencyLimit =
67+
rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit;
68+
69+
if ((concurrencyKey === null) !== (concurrencyLimit === null)) {
70+
throw new Error(INVALID_CONCURRENCY_PAIR_ERROR);
71+
}
72+
73+
if (
74+
typeof concurrencyKey === "string" &&
75+
concurrencyKey.trim().length === 0
76+
) {
77+
throw new Error(INVALID_CONCURRENCY_KEY_VALUE_ERROR);
78+
}
79+
80+
if (
81+
typeof concurrencyLimit === "number" &&
82+
(!Number.isFinite(concurrencyLimit) ||
83+
!Number.isInteger(concurrencyLimit) ||
84+
concurrencyLimit <= 0)
85+
) {
86+
throw new Error(INVALID_CONCURRENCY_LIMIT_VALUE_ERROR);
87+
}
88+
89+
return {
90+
...params,
91+
concurrencyKey,
92+
concurrencyLimit,
93+
};
94+
}
95+
96+
/**
97+
* Return bucket identity for constrained runs, otherwise null.
98+
* @param params - Normalized workflow run creation params
99+
* @returns Concurrency bucket or null for unconstrained runs
100+
*/
101+
export function toConcurrencyBucket(
102+
params: CreateWorkflowRunParams,
103+
): ConcurrencyBucket | null {
104+
if (params.concurrencyKey === null || params.concurrencyLimit === null) {
105+
return null;
106+
}
107+
108+
return {
109+
workflowName: params.workflowName,
110+
version: params.version,
111+
key: params.concurrencyKey,
112+
limit: params.concurrencyLimit,
113+
};
114+
}

0 commit comments

Comments
 (0)