Skip to content

Commit d218dcd

Browse files
Add configurable schema support (#293)
* feat(postgres): add configurable schema support * docs(postgres): normalize schema prose formatting * test: remove assertion casts from postgres schema tests * fix(postgres): validate schema identifier length
1 parent 21e28ce commit d218dcd

7 files changed

Lines changed: 256 additions & 88 deletions

File tree

packages/docs/docs/postgres.mdx

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ const backend = await BackendPostgres.connect(url, {
6060
// Namespace for multi-tenant isolation (default: "default")
6161
namespaceId: "production",
6262

63+
// Database schema for OpenWorkflow tables (default: "openworkflow")
64+
schema: "openworkflow",
65+
6366
// Whether to run migrations on connect (default: true)
6467
runMigrations: true,
6568
});
@@ -68,7 +71,7 @@ const backend = await BackendPostgres.connect(url, {
6871
## Migrations
6972

7073
By default, `BackendPostgres.connect()` runs database migrations automatically.
71-
This creates the `openworkflow` schema and required tables.
74+
This creates the configured schema (default: `openworkflow`) and required tables.
7275

7376
To disable automatic migrations:
7477

@@ -82,10 +85,10 @@ When disabled, ensure you run migrations separately before starting workers.
8285

8386
## Schema
8487

85-
OpenWorkflow creates tables in the `openworkflow` schema:
88+
OpenWorkflow creates tables in the configured schema (default: `openworkflow`):
8689

87-
- `openworkflow.workflow_runs` - Stores workflow run state
88-
- `openworkflow.step_attempts` - Stores step execution history
90+
- `<schema>.workflow_runs` - Stores workflow run state
91+
- `<schema>.step_attempts` - Stores step execution history
8992

9093
This keeps OpenWorkflow data separate from your application tables.
9194

@@ -135,4 +138,4 @@ automatically and connections are reused across workflow executions.
135138
- The connecting user needs permissions to:
136139
- Create schemas (for migrations)
137140
- Create tables (for migrations)
138-
- Read/write to the `openworkflow` schema
141+
- Read/write to the configured schema (default: `openworkflow`)

packages/docs/docs/production.mdx

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ For most production use cases, use PostgreSQL 14 or later:
4040

4141
- The connecting user needs permissions to create schemas and tables (for
4242
migrations)
43-
- Workflow state is stored in the `openworkflow` schema
43+
- Workflow state is stored in the configured schema (default: `openworkflow`)
4444

4545
For single-server deployments, SQLite works well.
4646

@@ -63,6 +63,7 @@ export default defineConfig({
6363
process.env.OPENWORKFLOW_POSTGRES_URL!,
6464
{
6565
namespaceId: process.env.OPENWORKFLOW_NAMESPACE_ID || "production",
66+
schema: process.env.OPENWORKFLOW_SCHEMA || "openworkflow",
6667
},
6768
),
6869
dirs: ["./openworkflow"],
@@ -72,16 +73,17 @@ export default defineConfig({
7273
});
7374
```
7475

75-
In this example, we use the `OPENWORKFLOW_POSTGRES_URL` and
76-
`OPENWORKFLOW_NAMESPACE_ID` env vars to dynamically set the Postgres URL and
77-
namespace, but you can set this up however you'd like.
76+
In this example, we use `OPENWORKFLOW_POSTGRES_URL`,
77+
`OPENWORKFLOW_NAMESPACE_ID`, and `OPENWORKFLOW_SCHEMA` env vars to dynamically
78+
set the Postgres URL, namespace, and schema, but you can set this up however
79+
you'd like.
7880

7981
### 3. Migrations
8082

8183
Migrations run automatically when the backend connects. OpenWorkflow creates:
8284

83-
- `openworkflow.workflow_runs` - Stores workflow run state
84-
- `openworkflow.step_attempts` - Stores step execution history
85+
- `<schema>.workflow_runs` - Stores workflow run state
86+
- `<schema>.step_attempts` - Stores step execution history
8587

8688
### 4. Deploy Workers
8789

packages/openworkflow/client.test.ts

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS } from "./backend.js";
22
import { OpenWorkflow } from "./client.js";
33
import { BackendPostgres } from "./postgres.js";
4-
import { DEFAULT_POSTGRES_URL, Postgres } from "./postgres/postgres.js";
4+
import {
5+
DEFAULT_POSTGRES_URL,
6+
DEFAULT_SCHEMA,
7+
newPostgresMaxOne,
8+
} from "./postgres/postgres.js";
59
import {
610
DEFAULT_WORKFLOW_RETRY_POLICY,
711
defineWorkflowSpec,
@@ -372,20 +376,21 @@ describe("OpenWorkflow", () => {
372376

373377
const first = await workflow.run({ value: 1 }, { idempotencyKey: key });
374378

375-
const internalBackend = backend as unknown as {
376-
pg: Postgres;
377-
namespaceId: string;
378-
};
379379
const staleCreatedAt = new Date(
380380
Date.now() - DEFAULT_RUN_IDEMPOTENCY_PERIOD_MS - 60_000,
381381
);
382-
383-
await internalBackend.pg`
384-
UPDATE "openworkflow"."workflow_runs"
385-
SET "created_at" = ${staleCreatedAt}
386-
WHERE "namespace_id" = ${internalBackend.namespaceId}
387-
AND "id" = ${first.workflowRun.id}
388-
`;
382+
const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
383+
try {
384+
const workflowRunsTable = pg`${pg(DEFAULT_SCHEMA)}.${pg("workflow_runs")}`;
385+
await pg`
386+
UPDATE ${workflowRunsTable}
387+
SET "created_at" = ${staleCreatedAt}
388+
WHERE "namespace_id" = ${first.workflowRun.namespaceId}
389+
AND "id" = ${first.workflowRun.id}
390+
`;
391+
} finally {
392+
await pg.end();
393+
}
389394

390395
const second = await workflow.run({ value: 2 }, { idempotencyKey: key });
391396
expect(second.workflowRun.id).not.toBe(first.workflowRun.id);
@@ -531,10 +536,12 @@ describe("OpenWorkflow", () => {
531536
const backend = await createBackend();
532537
const client = new OpenWorkflow({ backend });
533538

534-
const workflow = client.defineWorkflow(
535-
{ name: "define-wrap-test" },
536-
({ input }) => ({ doubled: (input as { n: number }).n * 2 }),
537-
);
539+
const workflow = client.defineWorkflow<
540+
{ n: number },
541+
{ doubled: number }
542+
>({ name: "define-wrap-test" }, ({ input }) => ({
543+
doubled: input.n * 2,
544+
}));
538545

539546
const handle = await workflow.run({ n: 21 });
540547
const worker = client.newWorker();

packages/openworkflow/postgres/backend.test.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { testBackend } from "../backend.testsuite.js";
22
import { BackendPostgres } from "./backend.js";
3-
import { DEFAULT_POSTGRES_URL } from "./postgres.js";
3+
import {
4+
DEFAULT_POSTGRES_URL,
5+
dropSchema,
6+
newPostgresMaxOne,
7+
} from "./postgres.js";
48
import assert from "node:assert";
59
import { randomUUID } from "node:crypto";
610
import { describe, expect, test } from "vitest";
@@ -26,4 +30,67 @@ describe("BackendPostgres.connect errors", () => {
2630
/Postgres backend failed to connect.*postgresql:\/\/user:pass@host:port\/db.*:/,
2731
);
2832
});
33+
34+
test("throws a clear error for invalid schema names", async () => {
35+
await expect(
36+
BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
37+
schema: "invalid-schema",
38+
}),
39+
).rejects.toThrow(/Invalid schema name/);
40+
});
41+
42+
test("throws for schema names longer than 63 bytes", async () => {
43+
await expect(
44+
BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
45+
schema: "a".repeat(64),
46+
}),
47+
).rejects.toThrow(/at most 63 bytes/i);
48+
});
49+
});
50+
51+
describe("BackendPostgres schema option", () => {
52+
test("stores workflow data in the configured schema", async () => {
53+
const schema = `test_schema_${randomUUID().replaceAll("-", "_")}`;
54+
const namespaceId = randomUUID();
55+
const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, {
56+
namespaceId,
57+
schema,
58+
});
59+
60+
try {
61+
const workflowRun = await backend.createWorkflowRun({
62+
workflowName: "schema-test",
63+
version: null,
64+
idempotencyKey: null,
65+
input: null,
66+
config: {},
67+
context: null,
68+
availableAt: null,
69+
deadlineAt: null,
70+
});
71+
72+
const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
73+
try {
74+
const workflowRunsTable = pg`${pg(schema)}.${pg("workflow_runs")}`;
75+
76+
const [record] = await pg<{ id: string }[]>`
77+
SELECT "id"
78+
FROM ${workflowRunsTable}
79+
WHERE "namespace_id" = ${namespaceId}
80+
AND "id" = ${workflowRun.id}
81+
LIMIT 1
82+
`;
83+
84+
expect(record?.id).toBe(workflowRun.id);
85+
} finally {
86+
await pg.end();
87+
}
88+
} finally {
89+
await backend.stop();
90+
91+
const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL);
92+
await dropSchema(pg, schema);
93+
await pg.end();
94+
}
95+
});
2996
});

0 commit comments

Comments
 (0)