Skip to content

Commit 80d1382

Browse files
committed
feat: Update workflow concurrency to support optional keys and default bucket usage
1 parent 28c81b6 commit 80d1382

15 files changed

Lines changed: 248 additions & 61 deletions

File tree

ARCHITECTURE.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,11 @@ defineWorkflow(
352352
```
353353

354354
`key` and `limit` can each be either static values (`string`/`number`) or
355-
functions of the validated workflow input. They are resolved once when the run
356-
is created and persisted on `workflow_runs`.
355+
functions of the validated workflow input, and `key` is optional. They are
356+
resolved once when the run is created and persisted on `workflow_runs`.
357357
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.
358+
When `key` is omitted, the run uses the default bucket for
359+
`namespace_id + workflow_name + version`.
358360

359361
During claim/dequeue, a run is claimable only when the number of active leased
360362
`running` runs in the same bucket is below the run's `limit`. The bucket scope
@@ -363,7 +365,7 @@ is:
363365
- `namespace_id`
364366
- `workflow_name`
365367
- `version` (version-aware buckets)
366-
- `concurrency_key`
368+
- `concurrency_key` (nullable for the default bucket)
367369

368370
`pending`, `sleeping`, and expired-lease `running` runs do not consume
369371
concurrency slots.

packages/docs/docs/workers.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ defineWorkflow(
9898
{
9999
name: "process-order",
100100
concurrency: {
101-
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
101+
key: ({ input }) => `tenant:${input.tenantId}`, // optional
102102
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
103103
},
104104
},
@@ -113,7 +113,7 @@ Workers will only claim a run when the bucket has capacity. Bucket scope is:
113113
- namespace
114114
- workflow name
115115
- workflow version
116-
- resolved concurrency key
116+
- resolved concurrency key (or default bucket when key is omitted)
117117

118118
Only active leased `running` runs consume workflow-concurrency slots.
119119
Resolved keys are stored verbatim; only empty/all-whitespace keys are rejected.

packages/docs/docs/workflows.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ defineWorkflow(
198198
{
199199
name: "process-order",
200200
concurrency: {
201-
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
201+
key: ({ input }) => `tenant:${input.tenantId}`, // optional
202202
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
203203
},
204204
},
@@ -208,9 +208,9 @@ defineWorkflow(
208208
);
209209
```
210210

211-
- `key` can be a string or a function `({ input }) => string`
211+
- `key` is optional; when set it can be a string or `({ input }) => string`
212212
- `limit` can be a number or a function `({ input }) => number`
213-
- key must resolve to a non-empty string
213+
- if provided, key must resolve to a non-empty string
214214
- limit must resolve to a positive integer
215215
- resolved keys are stored verbatim; only empty/all-whitespace keys are rejected
216216
- within active runs (`pending`/`running`) for the same
@@ -221,7 +221,7 @@ When concurrency is configured, runs in the same bucket are constrained by:
221221
- namespace
222222
- workflow name
223223
- workflow version
224-
- resolved `key`
224+
- resolved `key` (or the default bucket when key is omitted)
225225

226226
Only actively leased `running` runs consume slots. `pending`, `sleeping`, and
227227
expired-lease runs do not.

packages/openworkflow/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ For more details, check out our [docs](https://openworkflow.dev/docs).
6868
-**Long pauses** - Sleep for seconds or months
6969
-**Scheduled runs** - Start workflows at a specific time
7070
-**Parallel execution** - Run steps concurrently
71-
-**Workflow concurrency** - Limit active runs by key (static or input-based)
71+
-**Workflow concurrency** - Limit active runs by bucket (optional key)
7272
-**Idempotency keys** - Deduplicate repeated run requests (24h window)
7373
-**No extra servers** - Uses your existing database
7474
-**Dashboard included** - Monitor and debug workflows
@@ -83,7 +83,7 @@ const workflow = defineWorkflow(
8383
{
8484
name: "process-order",
8585
concurrency: {
86-
key: ({ input }) => `tenant:${input.tenantId}`, // or: "tenant:acme"
86+
key: ({ input }) => `tenant:${input.tenantId}`, // optional
8787
limit: ({ input }) => input.maxConcurrentOrders, // or: 5
8888
},
8989
},
@@ -93,8 +93,10 @@ const workflow = defineWorkflow(
9393
);
9494
```
9595

96-
`key` must resolve to a non-empty string and `limit` must resolve to a positive
97-
integer. Invalid values fail run creation.
96+
`limit` must resolve to a positive integer. If `key` is provided, it must
97+
resolve to a non-empty string. Invalid values fail run creation.
98+
When `key` is omitted, runs use the default bucket for
99+
`namespace + workflow + version`.
98100
Keys are stored verbatim (for example, `" foo "` and `"foo"` are different
99101
concurrency keys); only empty or all-whitespace keys are rejected.
100102
Sleeping runs do not consume workflow-concurrency slots until they are claimed

packages/openworkflow/backend-concurrency.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const INVALID_CONCURRENCY_LIMIT_TYPE_ERROR =
99
export const INVALID_CONCURRENCY_LIMIT_VALUE_ERROR =
1010
'Invalid workflow concurrency metadata: "concurrencyLimit" must be a positive integer or null.';
1111
const INVALID_CONCURRENCY_PAIR_ERROR =
12-
'Invalid workflow concurrency metadata: "concurrencyKey" and "concurrencyLimit" must both be null or both be set.';
12+
'Invalid workflow concurrency metadata: "concurrencyLimit" must be set when "concurrencyKey" is provided.';
1313
export const CONCURRENCY_LIMIT_MISMATCH_ERROR =
1414
'Invalid workflow concurrency metadata: active runs in the same bucket must use the same "concurrencyLimit".';
1515

@@ -19,7 +19,7 @@ export const CONCURRENCY_LIMIT_MISMATCH_ERROR =
1919
export interface ConcurrencyBucket {
2020
workflowName: string;
2121
version: string | null;
22-
key: string;
22+
key: string | null;
2323
limit: number;
2424
}
2525

@@ -66,7 +66,7 @@ export function normalizeCreateWorkflowRunParams(
6666
const concurrencyLimit =
6767
rawConcurrencyLimit === undefined ? null : rawConcurrencyLimit;
6868

69-
if ((concurrencyKey === null) !== (concurrencyLimit === null)) {
69+
if (concurrencyKey !== null && concurrencyLimit === null) {
7070
throw new Error(INVALID_CONCURRENCY_PAIR_ERROR);
7171
}
7272

@@ -101,7 +101,7 @@ export function normalizeCreateWorkflowRunParams(
101101
export function toConcurrencyBucket(
102102
params: CreateWorkflowRunParams,
103103
): ConcurrencyBucket | null {
104-
if (params.concurrencyKey === null || params.concurrencyLimit === null) {
104+
if (params.concurrencyLimit === null) {
105105
return null;
106106
}
107107

packages/openworkflow/client/client.test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,25 @@ describe("OpenWorkflow", () => {
402402
expect(handle.workflowRun.concurrencyLimit).toBe(3);
403403
});
404404

405+
test("resolves literal workflow concurrency limit without key", async () => {
406+
const backend = await createBackend();
407+
const client = new OpenWorkflow({ backend });
408+
409+
const workflow = client.defineWorkflow(
410+
{
411+
name: "concurrency-literal-limit-only-test",
412+
concurrency: {
413+
limit: 3,
414+
},
415+
},
416+
noopFn,
417+
);
418+
const handle = await workflow.run({ value: 1 });
419+
420+
expect(handle.workflowRun.concurrencyKey).toBeNull();
421+
expect(handle.workflowRun.concurrencyLimit).toBe(3);
422+
});
423+
405424
test("resolves function workflow concurrency values from parsed input", async () => {
406425
const backend = await createBackend();
407426
const client = new OpenWorkflow({ backend });
@@ -432,6 +451,33 @@ describe("OpenWorkflow", () => {
432451
expect(handle.workflowRun.concurrencyLimit).toBe(4);
433452
});
434453

454+
test("resolves function workflow concurrency limit from parsed input without key", async () => {
455+
const backend = await createBackend();
456+
const client = new OpenWorkflow({ backend });
457+
458+
const schema = z.object({
459+
limit: z.coerce.number().int().positive(),
460+
});
461+
462+
const workflow = client.defineWorkflow(
463+
{
464+
name: "concurrency-function-limit-only-test",
465+
schema,
466+
concurrency: {
467+
limit: ({ input }) => Number(input.limit),
468+
},
469+
},
470+
noopFn,
471+
);
472+
473+
const handle = await workflow.run({
474+
limit: "4",
475+
});
476+
477+
expect(handle.workflowRun.concurrencyKey).toBeNull();
478+
expect(handle.workflowRun.concurrencyLimit).toBe(4);
479+
});
480+
435481
test("throws when resolved workflow concurrency key is invalid", async () => {
436482
const backend = await createBackend();
437483
const client = new OpenWorkflow({ backend });

packages/openworkflow/client/client.ts

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -286,25 +286,6 @@ function resolveWorkflowConcurrency<Input>(
286286
};
287287
}
288288

289-
let keyValue: unknown;
290-
try {
291-
keyValue =
292-
typeof concurrency.key === "function"
293-
? concurrency.key({ input })
294-
: concurrency.key;
295-
} catch (error) {
296-
throw new Error(
297-
`Failed to resolve concurrency key for workflow "${workflowName}"`,
298-
{ cause: error },
299-
);
300-
}
301-
302-
if (typeof keyValue !== "string" || keyValue.trim().length === 0) {
303-
throw new Error(
304-
`Invalid concurrency key for workflow "${workflowName}": expected a non-empty string`,
305-
);
306-
}
307-
308289
let limitValue: unknown;
309290
try {
310291
limitValue =
@@ -328,6 +309,29 @@ function resolveWorkflowConcurrency<Input>(
328309
);
329310
}
330311

312+
let keyValue: string | null = null;
313+
if (concurrency.key !== undefined) {
314+
let resolvedKey: unknown;
315+
try {
316+
resolvedKey =
317+
typeof concurrency.key === "function"
318+
? concurrency.key({ input })
319+
: concurrency.key;
320+
} catch (error) {
321+
throw new Error(
322+
`Failed to resolve concurrency key for workflow "${workflowName}"`,
323+
{ cause: error },
324+
);
325+
}
326+
327+
if (typeof resolvedKey !== "string" || resolvedKey.trim().length === 0) {
328+
throw new Error(
329+
`Invalid concurrency key for workflow "${workflowName}": expected a non-empty string`,
330+
);
331+
}
332+
keyValue = resolvedKey;
333+
}
334+
331335
return {
332336
concurrencyKey: keyValue,
333337
concurrencyLimit: limitValue,

packages/openworkflow/core/workflow-definition.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ export interface WorkflowConcurrencyResolverParams<Input> {
1515
*/
1616
export interface WorkflowConcurrency<Input> {
1717
/**
18-
* Bucket key used to scope concurrency for this run.
18+
* Optional bucket key used to scope concurrency for this run.
19+
* When omitted, runs use the default workflow+version bucket.
1920
*/
20-
readonly key:
21+
readonly key?:
2122
| string
2223
| ((params: Readonly<WorkflowConcurrencyResolverParams<Input>>) => string);
2324
/**

packages/openworkflow/postgres/backend.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,11 @@ export class BackendPostgres implements Backend {
235235

236236
private async acquireConcurrencyCreateLock(
237237
pg: Postgres,
238-
params: { workflowName: string; version: string | null; key: string },
238+
params: {
239+
workflowName: string;
240+
version: string | null;
241+
key: string | null;
242+
},
239243
): Promise<void> {
240244
// Intentionally uses a different lock payload shape than claim-time locks.
241245
// Create-time lock serializes concurrent creates in the bucket, while
@@ -277,7 +281,7 @@ export class BackendPostgres implements Backend {
277281
params: {
278282
workflowName: string;
279283
version: string | null;
280-
key: string;
284+
key: string | null;
281285
limit: number;
282286
},
283287
): Promise<void> {
@@ -288,7 +292,7 @@ export class BackendPostgres implements Backend {
288292
WHERE "namespace_id" = ${this.namespaceId}
289293
AND "workflow_name" = ${params.workflowName}
290294
AND "version" IS NOT DISTINCT FROM ${params.version}
291-
AND "concurrency_key" = ${params.key}
295+
AND "concurrency_key" IS NOT DISTINCT FROM ${params.key}
292296
-- Sleeping runs are excluded so long sleeps do not pin historical
293297
-- limits and block new run creation after config changes.
294298
AND "status" IN ('pending', 'running')
@@ -418,8 +422,7 @@ export class BackendPostgres implements Backend {
418422
AND wr."available_at" <= NOW()
419423
AND (wr."deadline_at" IS NULL OR wr."deadline_at" > NOW())
420424
AND (
421-
wr."concurrency_key" IS NULL
422-
OR wr."concurrency_limit" IS NULL
425+
wr."concurrency_limit" IS NULL
423426
OR CASE
424427
-- cspell:ignore xact hashtextextended
425428
-- Serialize constrained claims per bucket. pg_try_advisory lock
@@ -447,7 +450,7 @@ export class BackendPostgres implements Backend {
447450
WHERE active."namespace_id" = wr."namespace_id"
448451
AND active."workflow_name" = wr."workflow_name"
449452
AND active."version" IS NOT DISTINCT FROM wr."version"
450-
AND active."concurrency_key" = wr."concurrency_key"
453+
AND active."concurrency_key" IS NOT DISTINCT FROM wr."concurrency_key"
451454
AND active."status" = 'running'
452455
-- Candidates require available_at <= NOW(); active leased runs
453456
-- require available_at > NOW(). Keep explicit self-exclusion

packages/openworkflow/postgres/postgres.test.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,16 +105,22 @@ describe("postgres", () => {
105105
const indexes = await pg<
106106
{
107107
indexName: string;
108+
indexDef: string;
108109
}[]
109110
>`
110-
SELECT indexname AS "indexName"
111+
SELECT
112+
indexname AS "indexName",
113+
indexdef AS "indexDef"
111114
FROM pg_indexes
112115
WHERE schemaname = ${schema}
113116
AND tablename = 'workflow_runs'
114117
AND indexname = 'workflow_runs_concurrency_active_idx'
115118
`;
116119
/* cspell:enable */
117120
expect(indexes).toHaveLength(1);
121+
expect(indexes[0]?.indexDef).toMatch(
122+
/WHERE\s+\((?:"concurrency_limit"|concurrency_limit)\s+IS\s+NOT\s+NULL\)/i,
123+
);
118124
} finally {
119125
await dropSchema(pg, schema);
120126
}

0 commit comments

Comments
 (0)