The github.com/deepnoodle-ai/workflow/experimental/store/postgres
package is a Postgres-backed Store that satisfies every persistence
interface the workflow engine and its worker need:
worker.QueueStore— the run queue, claim loop, heartbeat, reaper, and terminal state.workflow.Checkpointer— lease-fenced checkpoint persistence for durable resume.workflow.StepProgressStore— per-step observability for UIs and dashboards.workflow.ActivityLogger— append-only activity history.
One pool, one schema, one place to look when something goes wrong.
The package depends only on jackc/pgx/v5 so the root workflow
module can stay stdlib-only. It lives in its own Go module so you can
depend on it (or not) without pulling pgx into consumers that use a
different database.
go get github.com/deepnoodle-ai/workflow/experimental/store/postgres
Inside this repository the top-level go.work already wires things
up, so you don't need to do anything extra to build the package
locally.
package main
import (
"context"
"log"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/deepnoodle-ai/workflow/experimental/store/postgres"
)
func main() {
ctx := context.Background()
pool, err := pgxpool.New(ctx, "postgres://user:pass@localhost:5432/workflow")
if err != nil {
log.Fatal(err)
}
defer pool.Close()
store := postgres.New(pool)
if err := store.Migrate(ctx); err != nil {
log.Fatal(err)
}
// store now satisfies worker.QueueStore, workflow.StepProgressStore,
// workflow.ActivityLogger, and exposes NewCheckpointer(lease).
}- You own the pool.
postgres.Newtakes an already-constructed*pgxpool.Pooland does not close it for you. Migrateis idempotent. It runs the embeddedschema.sql(CREATE TABLE IF NOT EXISTS) on every call. Safe to call on every startup, skip it in production if you prefer managing migrations externally.WithLogger(slog.Logger)attaches a structured logger; otherwise the store logs toio.Discard.
Migrate creates three tables.
The durable queue and state table. One row per run, including the checkpoint blob.
| Column | Type | Notes |
|---|---|---|
id |
TEXT PK |
The stable run identifier, also used as the workflow ExecutionID. |
spec |
BYTEA |
Opaque payload from Enqueue. The worker never inspects it. |
status |
TEXT |
One of the worker.Status values. |
attempt |
INTEGER |
0 for queued, increments on each claim. |
claimed_by |
TEXT |
WorkerID of the current leaseholder, or ''. |
heartbeat_at |
TIMESTAMPTZ |
Refreshed by the worker's heartbeat goroutine. |
checkpoint |
BYTEA |
JSON-encoded workflow.Checkpoint. |
result |
BYTEA |
Opaque terminal/dormant payload from Outcome.Result. |
error_message |
TEXT |
Failure reason for StatusFailed. |
created_at |
TIMESTAMPTZ |
NOW() at enqueue time. |
started_at |
TIMESTAMPTZ |
First claim timestamp. |
completed_at |
TIMESTAMPTZ |
Set when the run reaches a terminal status. |
Indexes:
(status, created_at)— claim loop ordering.(status, heartbeat_at)— reaper scans.
One row per (execution_id, step_name, branch_id); the latest
status update wins via ON CONFLICT ... DO UPDATE. A step that runs
on two branches produces two rows.
Use it to power a UI that watches workflow progress — the row stores
status, activity, attempt, started_at, finished_at,
error, and a JSONB detail blob for whatever extra context the
engine emits.
Append-only log of every activity invocation with its parameters,
result, error, start time, and duration. Keyed by a stable
entry.ID. Good for audit trails, replay analysis, and
post-mortem debugging.
Indexed by (execution_id, start_time) so pulling the full history
for one run is a single range scan.
A single *postgres.Store is usually handed to four places in a
running system: the worker, each workflow execution's checkpointer,
its progress store, and its activity logger.
Pass the store straight to worker.New:
w, _ := worker.New(worker.Config{
QueueStore: store,
Handler: myHandler,
})Enqueue, ClaimQueued, Heartbeat, Complete, ReclaimStale,
and DeadLetterStale all live on the store and do what the
QueueStore contract requires.
The claim uses SELECT ... FOR UPDATE SKIP LOCKED inside a short
transaction, so multiple workers claiming concurrently never receive
the same row. No advisory locks, no polling backoff fights — the
oldest queued row wins.
Store.NewCheckpointer(lease) returns a workflow.Checkpointer
whose writes are fenced on (claimed_by, attempt). Hand one to each
execution when you construct it:
lease := worker.Lease{RunID: c.ID, WorkerID: w.ID(), Attempt: c.Attempt}
cp := store.NewCheckpointer(lease)
exec, err := workflow.NewExecution(wf, registry,
workflow.WithExecutionID(c.ID),
workflow.WithCheckpointer(cp),
)SaveCheckpointwrites under the lease. A worker that has lost its claim seesworker.ErrLeaseLoston the next save and the engine surfaces the error to the handler — which should treat it as "my lease is gone, stop" and return cleanly.LoadCheckpointis intentionally unfenced. A fresh attempt must be able to read a prior attempt's snapshot, regardless of who wrote it.DeleteCheckpointclears thecheckpointcolumn without deleting the run row.
Checkpoints are stored as JSON-encoded workflow.Checkpoint blobs in
the checkpoint column — a single column update, no extra tables,
no version history. The checkpointer will refuse to load a snapshot
written by a newer library version (SchemaVersion
compatibility check) and will return workflow.ErrNoCheckpoint when
the row exists but has NULL data — the signal the engine needs to
start fresh.
Pass the store to workflow.WithStepProgressStore and the engine
will emit progress updates as steps run. Upserts are keyed by
(execution_id, step_name, branch_id); each update overwrites the
previous row for that key. The detail field is stored as JSONB so
you can query into it with standard SQL.
Step progress writes are fire-and-forget from the engine's side — a dropped write will not block the workflow. Transient errors are logged and the next update replaces the row anyway.
Pass the store to workflow.WithActivityLogger for an append-only
audit trail:
exec, _ := workflow.NewExecution(wf, registry,
workflow.WithCheckpointer(store.NewCheckpointer(lease)),
workflow.WithStepProgressStore(store),
workflow.WithActivityLogger(store),
)LogActivity inserts a row per invocation; GetActivityHistory
reads them back in start-time order for a given execution_id.
Parameters and results are stored as JSONB, so structured queries
(WHERE parameters->>'url' LIKE ...) are straightforward.
Every state-changing write on an active run carries
(claimed_by, attempt) in its WHERE clause. The pattern looks
like:
UPDATE workflow_runs
SET ...
WHERE id = $1
AND claimed_by = $2
AND attempt = $3;If the row has been reclaimed by the reaper (attempt bumped, or
claimed_by cleared) the update matches zero rows and the store
returns worker.ErrLeaseLost. The worker turns that into a
graceful stop for the run.
There is one notable exception: Complete does not filter by
status = 'running'. This is deliberate. A handler can legitimately
write a terminal status after the run context has been cancelled,
and we want that write to land even if the run ticked past
running in-memory. Fencing on (claimed_by, attempt) is enough to
keep two workers from racing.
The postgres test suite is gated on a real database. Without
WORKFLOW_PG_DSN the tests skip silently.
WORKFLOW_PG_DSN="postgres://postgres@localhost:5432/workflow_test?sslmode=disable" \
go test ./experimental/store/postgres/...
The tests run Migrate and then TRUNCATE all three tables between
runs, so point them at a throwaway database. They cover:
- Enqueue → claim → heartbeat → complete round-trip.
- Heartbeat rejection under a wrong
WorkerIDor wrongAttempt. - Reaper reclaim and dead-letter thresholds.
- Checkpointer round-trip including lease-lost behavior.
- Step progress upsert and activity log append/read.
- No signal store. The workflow engine supports
SignalStorefor signal-wait patterns, but this package does not ship a Postgres-backed implementation. Provide one yourself if you need it — it is additive and independent of the tables here. - No metrics hooks. Observability belongs to the logger you
attach via
WithLogger; if you want Prometheus or OpenTelemetry, wrap the store. - No tenancy or authorization. The store trusts whatever IDs and specs you hand it. Isolation between customers, projects, or environments is a concern for the layer above.
- No separate archive table. Completed, failed, and suspended
runs remain in
workflow_runs. Archival or pruning is the operator's responsibility.