Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ name: Release

on:
workflow_run:
workflows: ["CI"]
workflows: ['CI']
types: [completed]
branches: [main]
workflow_dispatch:

permissions:
contents: write
id-token: write
packages: write

jobs:
release:
Expand Down Expand Up @@ -50,3 +51,8 @@ jobs:

- if: steps.release.outputs.new_release == 'true'
run: npx nx release publish

- if: steps.release.outputs.new_release == 'true'
run: npx nx release publish --registry=https://npm.pkg.github.com
env:
NODE_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,31 @@ Declarative workflow DSL for orchestrating complex business operations in Javasc

```typescript
const flow = createFlow<{ orderId: string }>('process-order')
.validate('checkInput', (ctx) => { /* ... */ })
.step('fetchOrder', async (ctx) => { /* ... */ })
.step('chargePayment', async (ctx) => { /* ... */ })
.validate('checkInput', (ctx) => {
/* ... */
})
.step('fetchOrder', async (ctx) => {
/* ... */
})
.step('chargePayment', async (ctx) => {
/* ... */
})
.withRetry({ maxAttempts: 3, delayMs: 200, backoffMultiplier: 2 })
.event('orders', (ctx) => ({ eventType: 'order.charged', orderId: ctx.state.order.id }))
.event('orders', (ctx) => ({
eventType: 'order.charged',
orderId: ctx.state.order.id,
}))
.build();

await flow.execute({ orderId: 'ord_123' }, { db, eventPublisher });
```

Type-safe state threading, retries with exponential backoff, timeouts, database transactions, event publishing, parallel execution, and observability hooks — using plain async/await with zero dependencies.
Type-safe state threading, retries with exponential backoff, timeouts, database transactions, event publishing, parallel execution, durability, and observability hooks — using plain async/await with zero dependencies.

## Packages

| Package | Description |
|---------|-------------|
| Package | Description |
| --------------------------------- | --------------------- |
| [`@celom/prose`](packages/prose/) | Core workflow library |

## Development
Expand Down
131 changes: 131 additions & 0 deletions apps/docs/src/content/docs/api/durability-store.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
---
title: Durability Store
description: API reference for DurabilityStore, FlowCheckpoint, DurabilityOptions, and MemoryDurabilityStore.
sidebar:
order: 4
---

import { Aside } from '@astrojs/starlight/components';

For the conceptual overview and patterns, see the [durability guide](/prose/guides/durability/).

## `DurabilityStore`

Pluggable persistence layer for flow checkpoints. Adapters may live in separate packages.

```typescript
interface DurabilityStore {
load(runId: string): Promise<FlowCheckpoint | null>;
save(checkpoint: FlowCheckpoint): Promise<void>;
delete(runId: string): Promise<void>;
}
```

| Method | Semantics |
| ------------------ | ---------------------------------------------------------------------------------------------------------------------------------- |
| `load(runId)` | Return the checkpoint for `runId`, or `null` if none exists. Must not throw on unknown ids. |
| `save(checkpoint)` | Persist the checkpoint, overwriting any existing entry for the same `runId`. Must be atomic — partial writes defeat checkpointing. |
| `delete(runId)` | Remove the checkpoint. No-op for an unknown `runId` — must not throw. |

Implementations are responsible for serialization. JSON is the typical choice; whatever you pick must round-trip `Date` objects (`createdAt`, `updatedAt`) and support `undefined` for `breakValue` slots that are absent.

## `FlowCheckpoint`

The shape persisted to the store. Adapters store and return this verbatim.

```typescript
interface FlowCheckpoint {
flowName: string;
runId: string;
input: unknown;
state: unknown;
completedSteps: string[];
status: 'running' | 'completed' | 'failed';
breakValue?: unknown;
failedStep?: { name: string; error: string };
createdAt: Date;
updatedAt: Date;
}
```

| Field | Description |
| ---------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `flowName` | The flow's name (from `createFlow('name')`). Useful for diagnostics; not used to route resume. |
| `runId` | The `DurabilityOptions.runId` of this run. |
| `input` | The argument the flow was first started with. Used verbatim on resume — the caller's second `input` argument is ignored. |
| `state` | Accumulated state at the moment of the last successful step. |
| `completedSteps` | Names of finished or condition-skipped steps, in execution order. |
| `status` | `'running'` (in progress or crashed), `'completed'` (saved result is final), `'failed'` (errored after exhausting retries). |
| `breakValue` | Only present when the flow short-circuited via `breakIf`. The field's _presence_ distinguishes "broke" from "completed normally" — the value itself may be `undefined`. |
| `failedStep` | Only present when `status === 'failed'`. The step that errored, plus its error message. |
| `createdAt` | When the run first started. Preserved across resumes. |
| `updatedAt` | When the checkpoint was last written. |

## `DurabilityOptions`

The shape passed via `FlowExecutionOptions.durability`.

```typescript
interface DurabilityOptions {
store: DurabilityStore;
runId: string;
}
```

| Field | Description |
| ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `store` | The store to read and write checkpoints through. |
| `runId` | Stable identifier for this run. Same `runId` across `execute()` calls → resume or replay. New `runId` → fresh run. Typically derived from a business identifier. |

## `MemoryDurabilityStore`

A reference `DurabilityStore` that holds checkpoints in a `Map`.

```typescript
import { MemoryDurabilityStore } from '@celom/prose';

const store = new MemoryDurabilityStore();
```

<Aside type="caution">
Intended for tests, local development, and as a reference implementation.
State is held in memory only — it does **not** survive a process exit. Use a
persistent adapter in production.
</Aside>

Test helpers (not part of the `DurabilityStore` interface):

| Method | Purpose |
| ----------------- | ---------------------------------------------------------- |
| `size()` | Number of stored runs. |
| `snapshot(runId)` | Synchronous read of a single checkpoint (returns a clone). |
| `clear()` | Drop all stored checkpoints. |

`MemoryDurabilityStore` clones on both `load()` and `save()` so the executor cannot mutate stored state. Adapters that serialize to bytes get this property for free.

## Implementation contract

Conformance tests for adapter authors live at `packages/prose/src/lib/__tests__/store-conformance.ts`. The file is **not** re-exported from the package entry point and is **not** included in the published `dist/`. Copy or vendor it into your adapter package — or, for adapters in this monorepo, import it directly from source.

```typescript
// Once copied or vendored, run the suite in your adapter's spec file:
import { storeConformanceSuite } from './store-conformance.js';
import { MyStore } from './my-store.js';

storeConformanceSuite('MyStore', () => new MyStore());
```

The suite covers the public `DurabilityStore` contract — round-tripping checkpoints, overwriting on second `save()`, preserving the `failedStep` / `breakValue` fields, no-op `delete()` for unknown ids, and isolation between runIds. Adapter-specific invariants (clone-on-read for in-memory adapters, transaction semantics for SQL adapters) belong in your adapter's own spec.

Implementation notes:

- **Atomic writes.** A reader observing a partial checkpoint defeats the entire point of durability. Wrap the write in a transaction, or write to a temp row and swap.
- **`load()` returns `null`, not throws.** Unknown `runId`s are normal — they mean "fresh run."
- **`delete()` of an unknown id is a no-op.** No error.
- **Clone on read for non-serializing stores.** If your store keeps the checkpoint in memory (or shares an object reference with the caller), clone on `load()` and `save()` to prevent accidental mutation.

## Related

- [Durability guide](/prose/guides/durability/) — concepts, patterns, and feature interactions.
- [Order processing with durability](/prose/examples/order-processing-with-durability/) — end-to-end example with a crash scenario.
- [Execution options](/prose/api/execution-options/) — the `durability` option on `flow.execute()`.
30 changes: 15 additions & 15 deletions apps/docs/src/content/docs/api/error-types.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: Error Types
description: API reference for FlowExecutionError, ValidationError, and TimeoutError.
sidebar:
order: 4
order: 5
---

Prose exports three error classes, all extending `Error`.
Expand All @@ -26,10 +26,10 @@ ValidationError.multiple(issues: ValidationIssue[]): ValidationError

### Properties

| Property | Type | Description |
|----------|------|-------------|
| `message` | `string` | Error message |
| `issues` | `ValidationIssue[]` | Array of `{ field, message }` objects |
| Property | Type | Description |
| --------- | ------------------- | ------------------------------------- |
| `message` | `string` | Error message |
| `issues` | `ValidationIssue[]` | Array of `{ field, message }` objects |

### Methods

Expand Down Expand Up @@ -60,20 +60,20 @@ new FlowExecutionError(flowName: string, stepName: string, originalError: Error)

### Properties

| Property | Type | Description |
|----------|------|-------------|
| `flowName` | `string` | Name of the flow |
| `stepName` | `string` | Name of the step that failed |
| `originalError` | `Error` | The actual error thrown by the step |
| Property | Type | Description |
| --------------- | -------- | ----------------------------------- |
| `flowName` | `string` | Name of the flow |
| `stepName` | `string` | Name of the step that failed |
| `originalError` | `Error` | The actual error thrown by the step |

## `TimeoutError`

Thrown when a flow or step exceeds its configured timeout.

### Properties

| Property | Type | Description |
|----------|------|-------------|
| `flowName` | `string` | Name of the flow |
| `stepName` | `string \| undefined` | Step name (undefined for flow-level timeout) |
| `timeoutMs` | `number` | The timeout value that was exceeded |
| Property | Type | Description |
| ----------- | --------------------- | -------------------------------------------- |
| `flowName` | `string` | Name of the flow |
| `stepName` | `string \| undefined` | Step name (undefined for flow-level timeout) |
| `timeoutMs` | `number` | The timeout value that was exceeded |
18 changes: 18 additions & 0 deletions apps/docs/src/content/docs/api/execution-options.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,21 @@ Control behavior for missing optional dependencies.
}
}
```

### `durability`

**Type:** `DurabilityOptions`

Opt-in skip-ahead checkpointing. When set, each successful step persists a checkpoint to the configured store. Re-invoking `execute()` with the same `runId` resumes from the next undone step (or returns the saved result if the run already completed).

```typescript
import { MemoryDurabilityStore } from '@celom/prose';

const store = new MemoryDurabilityStore();

await flow.execute(input, deps, {
durability: { store, runId: input.orderId },
});
```

See the [durability guide](/prose/guides/durability/) for the full story and the [DurabilityStore API reference](/prose/api/durability-store/) for the types.
30 changes: 25 additions & 5 deletions apps/docs/src/content/docs/api/observers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: Observers
description: API reference for FlowObserver interface and built-in observer implementations.
sidebar:
order: 5
order: 6
---

## `FlowObserver` interface
Expand All @@ -14,11 +14,31 @@ interface FlowObserver {
onFlowStart?(flowName: string, input: unknown): void;
onFlowComplete?(flowName: string, output: unknown, duration: number): void;
onFlowError?(flowName: string, error: Error, duration: number): void;
onFlowBreak?(flowName: string, breakStepName: string, returnValue: unknown, duration: number): void;
onFlowBreak?(
flowName: string,
breakStepName: string,
returnValue: unknown,
duration: number
): void;
onStepStart?(stepName: string, context: FlowContext): void;
onStepComplete?(stepName: string, result: unknown, duration: number, context: FlowContext): void;
onStepError?(stepName: string, error: Error, duration: number, context: FlowContext): void;
onStepRetry?(stepName: string, attempt: number, maxAttempts: number, error: Error): void;
onStepComplete?(
stepName: string,
result: unknown,
duration: number,
context: FlowContext
): void;
onStepError?(
stepName: string,
error: Error,
duration: number,
context: FlowContext
): void;
onStepRetry?(
stepName: string,
attempt: number,
maxAttempts: number,
error: Error
): void;
onStepSkipped?(stepName: string, context: FlowContext): void;
}
```
Expand Down
35 changes: 26 additions & 9 deletions apps/docs/src/content/docs/api/types.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: Types
description: API reference for core TypeScript types exported by Prose.
sidebar:
order: 6
order: 7
---

## `FlowContext`
Expand All @@ -19,12 +19,12 @@ interface FlowContext<TInput, TDeps extends BaseFlowDependencies, TState> {
}
```

| Property | Description |
|----------|-------------|
| `input` | Original input, readonly. Never changes during execution. |
| `state` | Accumulated state from prior steps. Each step's return merges into this. |
| `deps` | Dependencies injected via `.execute()`. |
| `meta` | Runtime metadata (see `FlowMeta`). |
| Property | Description |
| -------- | --------------------------------------------------------------------------- |
| `input` | Original input, readonly. Never changes during execution. |
| `state` | Accumulated state from prior steps. Each step's return merges into this. |
| `deps` | Dependencies injected via `.execute()`. |
| `meta` | Runtime metadata (see `FlowMeta`). |
| `signal` | Combined abort signal from flow timeout, step timeout, and external signal. |

## `BaseFlowDependencies`
Expand Down Expand Up @@ -58,9 +58,23 @@ interface FlowMeta {
startedAt: Date;
currentStep?: string;
correlationId?: string;
// Durability-only — present when execute() was passed a `durability` option
runId?: string;
idempotencyKey?: string;
isResuming?: boolean;
}
```

| Field | Description |
| ---------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `flowName` | The flow's name. |
| `startedAt` | When this `execute()` call began. Not preserved across resumes — set on each invocation. |
| `currentStep` | The name of the step currently executing. |
| `correlationId` | Custom or auto-generated ID propagated to events and observers. |
| `runId` | The `DurabilityOptions.runId` of this run. Present only when durability is configured. |
| `idempotencyKey` | Stable per-step key (`${runId}:${currentStep}`). Pass to external APIs that support idempotency so re-runs are safe. Present only when durability is configured. |
| `isResuming` | `true` when this execution loaded an existing checkpoint rather than starting fresh. Present only when durability is configured. |

## `RetryOptions`

Configuration for `.withRetry()`.
Expand Down Expand Up @@ -116,8 +130,11 @@ When you provide a typed `DatabaseClient` (e.g. from Drizzle), Prose infers `TTx
Utility type that extracts the transaction client type from your dependencies. Useful when extracting transaction step handlers into standalone functions.

```typescript
type TxClientOf<TDeps extends BaseFlowDependencies> =
TDeps extends { db: DatabaseClient<infer TTx> } ? TTx : unknown;
type TxClientOf<TDeps extends BaseFlowDependencies> = TDeps extends {
db: DatabaseClient<infer TTx>;
}
? TTx
: unknown;
```

Define it once in your application types and reuse across extracted step functions:
Expand Down
Loading
Loading