Skip to content

Commit 057d0a4

Browse files
committed
feat: Update workflow concurrency to support optional keys and default bucket usage
1 parent 8ffac8a commit 057d0a4

15 files changed

Lines changed: 248 additions & 61 deletions

ARCHITECTURE.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,11 @@ defineWorkflow(
329329
```
330330

331331
`key` and `limit` can each be either static values (`string`/`number`) or
332-
functions of the validated workflow input. They are resolved once when the run
333-
is created and persisted on `workflow_runs`.
332+
functions of the validated workflow input, and `key` is optional. They are
333+
resolved once when the run is created and persisted on `workflow_runs`.
334334
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
335+
When `key` is omitted, the run uses the default bucket for
336+
`namespace_id + workflow_name + version`.
335337

336338
During claim/dequeue, a run is claimable only when the number of active leased
337339
`running` runs in the same bucket is below the run's `limit`. The bucket scope
@@ -340,7 +342,7 @@ is:
340342
- `namespace_id`
341343
- `workflow_name`
342344
- `version` (version-aware buckets)
343-
- `concurrency_key`
345+
- `concurrency_key` (nullable for the default bucket)
344346

345347
`pending`, `sleeping`, and expired-lease `running` runs do not consume
346348
concurrency slots.

packages/docs/docs/workers.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ defineWorkflow(
9898
{
9999
name: "process-order",
100100
concurrency: {
101-
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
101+
key: ({ input }) => `tenant:${input.tenantId}`, // optional
102102
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
103103
},
104104
},
@@ -113,7 +113,7 @@ Workers will only claim a run when the bucket has capacity. Bucket scope is:
113113
- namespace
114114
- workflow name
115115
- workflow version
116-
- resolved concurrency key
116+
- resolved concurrency key (or default bucket when key is omitted)
117117

118118
Only active leased `running` runs consume workflow-concurrency slots.
119119
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.

packages/docs/docs/workflows.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ defineWorkflow(
198198
{
199199
name: "process-order",
200200
concurrency: {
201-
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
201+
key: ({ input }) => `tenant:${input.tenantId}`, // optional
202202
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
203203
},
204204
},
@@ -208,9 +208,9 @@ defineWorkflow(
208208
);
209209
```
210210

211-
- `key` can be a string or a function `({ input }) => string`
211+
- `key` is optional; when set it can be a string or `({ input }) => string`
212212
- `limit` can be a number or a function `({ input }) => number`
213-
- key must resolve to a non-empty string
213+
- if provided, key must resolve to a non-empty string
214214
- limit must resolve to a positive integer
215215
- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected
216216
- within active runs (`pending`/`running`) for the same
@@ -221,7 +221,7 @@ When concurrency is configured, runs in the same bucket are constrained by:
221221
- namespace
222222
- workflow name
223223
- workflow version
224-
- resolved `key`
224+
- resolved `key` (or the default bucket when key is omitted)
225225

226226
Only actively leased `running` runs consume slots. `pending`, `sleeping`, and
227227
expired-lease runs do not.

packages/openworkflow/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ For more details, check out our [docs](https://openworkflow.dev/docs).
6767
-**Long pauses** - Sleep for seconds or months
6868
-**Scheduled runs** - Start workflows at a specific time
6969
-**Parallel execution** - Run steps concurrently
70-
-**Workflow concurrency** - Limit active runs by key (static or input-based)
70+
-**Workflow concurrency** - Limit active runs by bucket (optional key)
7171
-**Idempotency keys** - Deduplicate repeated run requests (24h window)
7272
-**No extra servers** - Uses your existing database
7373
-**Dashboard included** - Monitor and debug workflows
@@ -82,7 +82,7 @@ const workflow = defineWorkflow(
8282
{
8383
name: "process-order",
8484
concurrency: {
85-
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
85+
key: ({ input }) => `tenant:${input.tenantId}`, // optional
8686
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
8787
},
8888
},
@@ -92,8 +92,10 @@ const workflow = defineWorkflow(
9292
);
9393
```
9494

95-
`key` must resolve to a non-empty string and `limit` must resolve to a positive
96-
integer. Invalid values fail run creation.
95+
`limit` must resolve to a positive integer. If `key` is provided, it must
96+
resolve to a non-empty string. Invalid values fail run creation.
97+
When `key` is omitted, runs use the default bucket for
98+
`namespace + workflow + version`.
9799
Keys are stored verbatim (for example, `" foo "` and `"foo"` are different
98100
concurrency keys); only empty or all-whitespace keys are rejected.
99101
Sleeping runs do not consume workflow-concurrency slots until they are claimed

packages/openworkflow/backend-concurrency.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR =
99
export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR =
1010
'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.';
1111
const INVALID_CONCURRENCY_PAIR_ERROR =
12-
'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.';
12+
'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.';
1313
export const CONCURRENCY_LIMIT_MISMATCH_ERROR =
1414
'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".';
1515

@@ -19,7 +19,7 @@ export const CONCURRENCY_LIMIT_MISMATCH_ERROR =
1919
export interface ConcurrencyBucket {
2020
workflowName: string;
2121
version: string | null;
22-
key: string;
22+
key: string | null;
2323
limit: number;
2424
}
2525

@@ -66,7 +66,7 @@ export function normalizeCreateWorkflowRunParams(
6666
const concurrencyLimit =
6767
rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit;
6868

69-
if ((concurrencyKey === null) !== (concurrencyLimit === null)) {
69+
if (concurrencyKey !== null && concurrencyLimit === null) {
7070
throw new Error(INVALID_CONCURRENCY_PAIR_ERROR);
7171
}
7272

@@ -101,7 +101,7 @@ export function normalizeCreateWorkflowRunParams(
101101
export function toConcurrencyBucket(
102102
params: CreateWorkflowRunParams,
103103
): ConcurrencyBucket | null {
104-
if (params.concurrencyKey === null || params.concurrencyLimit === null) {
104+
if (params.concurrencyLimit === null) {
105105
return null;
106106
}
107107

packages/openworkflow/backend.testsuite.ts

Lines changed: 112 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ export function testBackend(options: TestBackendOptions): void {
173173
expect(created.concurrencyLimit).toBeNull();
174174
});
175175

176-
test("rejects mismatched workflow concurrency metadata pairs", async () => {
176+
test("rejects key-only workflow concurrency metadata", async () => {
177177
const base = {
178178
workflowName: randomUUID(),
179179
version: null,
@@ -193,19 +193,30 @@ export function testBackend(options: TestBackendOptions): void {
193193
await expect(
194194
Promise.resolve().then(() => backend.createWorkflowRun(keyOnly)),
195195
).rejects.toThrow(
196-
'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.',
196+
'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.',
197197
);
198+
});
199+
200+
test("accepts limit-only workflow concurrency metadata as default bucket", async () => {
201+
const base = {
202+
workflowName: randomUUID(),
203+
version: null,
204+
idempotencyKey: null,
205+
input: null,
206+
config: {},
207+
context: null,
208+
availableAt: null,
209+
deadlineAt: null,
210+
};
198211

199212
const limitOnly = {
200213
...base,
201214
concurrencyKey: null,
202215
concurrencyLimit: 1,
203216
};
204-
await expect(
205-
Promise.resolve().then(() => backend.createWorkflowRun(limitOnly)),
206-
).rejects.toThrow(
207-
'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.',
208-
);
217+
const created = await backend.createWorkflowRun(limitOnly);
218+
expect(created.concurrencyKey).toBeNull();
219+
expect(created.concurrencyLimit).toBe(1);
209220
});
210221

211222
test("rejects invalid workflow concurrency limit values", async () => {
@@ -289,6 +300,35 @@ export function testBackend(options: TestBackendOptions): void {
289300
await teardown(backend);
290301
});
291302

303+
test("rejects mixed concurrency limits for the same active default bucket", async () => {
304+
const backend = await setup();
305+
const workflowName = randomUUID();
306+
const version = "v1";
307+
308+
await createPendingWorkflowRun(backend, {
309+
workflowName,
310+
version,
311+
concurrencyLimit: 1,
312+
});
313+
314+
await expect(
315+
backend.createWorkflowRun({
316+
workflowName,
317+
version,
318+
idempotencyKey: null,
319+
concurrencyKey: null,
320+
concurrencyLimit: 2,
321+
input: null,
322+
config: {},
323+
context: null,
324+
availableAt: null,
325+
deadlineAt: null,
326+
}),
327+
).rejects.toThrow(CONCURRENCY_LIMIT_MISMATCH_ERROR);
328+
329+
await teardown(backend);
330+
});
331+
292332
test("allows changing concurrency limit after terminal runs leave active states", async () => {
293333
const backend = await setup();
294334
const workflowName = randomUUID();
@@ -1034,6 +1074,40 @@ export function testBackend(options: TestBackendOptions): void {
10341074
await teardown(backend);
10351075
});
10361076

1077+
test("enforces concurrency limit for default workflow-version bucket when key is omitted", async () => {
1078+
const backend = await setup();
1079+
const workflowName = randomUUID();
1080+
const version = "v1";
1081+
const concurrencyLimit = 1;
1082+
1083+
await createPendingWorkflowRun(backend, {
1084+
workflowName,
1085+
version,
1086+
concurrencyLimit,
1087+
});
1088+
await createPendingWorkflowRun(backend, {
1089+
workflowName,
1090+
version,
1091+
concurrencyLimit,
1092+
});
1093+
1094+
const firstClaimed = await backend.claimWorkflowRun({
1095+
workerId: randomUUID(),
1096+
leaseDurationMs: 100,
1097+
});
1098+
expect(firstClaimed).not.toBeNull();
1099+
expect(firstClaimed?.concurrencyKey).toBeNull();
1100+
expect(firstClaimed?.concurrencyLimit).toBe(concurrencyLimit);
1101+
1102+
const blocked = await backend.claimWorkflowRun({
1103+
workerId: randomUUID(),
1104+
leaseDurationMs: 100,
1105+
});
1106+
expect(blocked).toBeNull();
1107+
1108+
await teardown(backend);
1109+
});
1110+
10371111
test("supports limits greater than one", async () => {
10381112
const backend = await setup();
10391113
const workflowName = randomUUID();
@@ -1210,6 +1284,37 @@ export function testBackend(options: TestBackendOptions): void {
12101284
await teardown(backend);
12111285
});
12121286

1287+
test("allows claims for different versions in default bucket when key is omitted", async () => {
1288+
const backend = await setup();
1289+
const workflowName = randomUUID();
1290+
1291+
await createPendingWorkflowRun(backend, {
1292+
workflowName,
1293+
version: "v1",
1294+
concurrencyLimit: 1,
1295+
});
1296+
await createPendingWorkflowRun(backend, {
1297+
workflowName,
1298+
version: "v2",
1299+
concurrencyLimit: 1,
1300+
});
1301+
1302+
const firstClaimed = await backend.claimWorkflowRun({
1303+
workerId: randomUUID(),
1304+
leaseDurationMs: 100,
1305+
});
1306+
const secondClaimed = await backend.claimWorkflowRun({
1307+
workerId: randomUUID(),
1308+
leaseDurationMs: 100,
1309+
});
1310+
1311+
expect(firstClaimed).not.toBeNull();
1312+
expect(secondClaimed).not.toBeNull();
1313+
expect(secondClaimed?.id).not.toBe(firstClaimed?.id);
1314+
1315+
await teardown(backend);
1316+
});
1317+
12131318
test("allows claims after the active lease expires", async () => {
12141319
const backend = await setup();
12151320
const workflowName = randomUUID();

packages/openworkflow/client.test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,25 @@ describe("OpenWorkflow", () => {
354354
expect(handle.workflowRun.concurrencyLimit).toBe(3);
355355
});
356356

357+
test("resolves literal workflow concurrency limit without key", async () => {
358+
const backend = await createBackend();
359+
const client = new OpenWorkflow({ backend });
360+
361+
const workflow = client.defineWorkflow(
362+
{
363+
name: "concurrency-literal-limit-only-test",
364+
concurrency: {
365+
limit: 3,
366+
},
367+
},
368+
noopFn,
369+
);
370+
const handle = await workflow.run({ value: 1 });
371+
372+
expect(handle.workflowRun.concurrencyKey).toBeNull();
373+
expect(handle.workflowRun.concurrencyLimit).toBe(3);
374+
});
375+
357376
test("resolves function workflow concurrency values from parsed input", async () => {
358377
const backend = await createBackend();
359378
const client = new OpenWorkflow({ backend });
@@ -384,6 +403,33 @@ describe("OpenWorkflow", () => {
384403
expect(handle.workflowRun.concurrencyLimit).toBe(4);
385404
});
386405

406+
test("resolves function workflow concurrency limit from parsed input without key", async () => {
407+
const backend = await createBackend();
408+
const client = new OpenWorkflow({ backend });
409+
410+
const schema = z.object({
411+
limit: z.coerce.number().int().positive(),
412+
});
413+
414+
const workflow = client.defineWorkflow(
415+
{
416+
name: "concurrency-function-limit-only-test",
417+
schema,
418+
concurrency: {
419+
limit: ({ input }) => Number(input.limit),
420+
},
421+
},
422+
noopFn,
423+
);
424+
425+
const handle = await workflow.run({
426+
limit: "4",
427+
});
428+
429+
expect(handle.workflowRun.concurrencyKey).toBeNull();
430+
expect(handle.workflowRun.concurrencyLimit).toBe(4);
431+
});
432+
387433
test("throws when resolved workflow concurrency key is invalid", async () => {
388434
const backend = await createBackend();
389435
const client = new OpenWorkflow({ backend });

0 commit comments

Comments
 (0)