Skip to content

Wf refactor#108

Merged
bd-mkt merged 18 commits into
mainfrom
wf_refactor
May 18, 2026
Merged

Wf refactor#108
bd-mkt merged 18 commits into
mainfrom
wf_refactor

Conversation

@bd-mkt
Copy link
Copy Markdown
Collaborator

@bd-mkt bd-mkt commented May 18, 2026

Workflow engine refactor: class-based Worker, atomic claim + lease tokens, ResourceLock

Summary

This branch is primarily a foundational refactor of the workflow execution engine: it replaces a module-global runner with a class-based Worker that owns typed consumer pools, lease-gated terminal writes, and a cross-subsystem ResourceLock table. The new persistence layer eliminates several long-standing concurrency bugs in the original runner (process-local locking, TOCTOU on group completion, no protection against stale workers double-finalizing steps). A handful of smaller fixes ride along: compression for filesystem artifacts, a free-disk-space pre-check, async haiku.rag wiring, docling refactor, and broad documentation updates.

Workflow engine refactor

Implemented across phases 1–8 (see commits f0728ab, 545a1a7, 44631cd) plus a final atomic-claim race fix.

Persistence layer (wf/operations.py)

  • Atomic claimclaim_next_step(worker_id, lease_token, allowed_types=…) replaces get_runnable_steps + set_step_status(RUNNING) with a single UPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED. Per-type concurrency is bounded at the SQL layer, eliminating in-process semaphores.
  • Lease tokens — every claim mints a per-claim UUID stamped on runstep.lease_token. complete_step / error_step / release_step are gated on the lease, so a worker reaped between claim and write can never bounce a fresh claimant.
  • ResourceLock table — cross-subsystem rendezvous keyed by resource_key (typically rag:<abs-db-path>). All RAG-DB writers (workflow save_to_rag steps, web vacuum endpoint, si-diag CLI, end_group lifecycle vacuums) coordinate here. TTL-based; expired rows are swept by a background loop.
  • Atomic claim + lock — when a step has a resource_key, the matching ResourceLock row is INSERTed in the same transaction as the claim. The unique PK constraint serializes concurrent claims targeting the same key (e.g. multiple save_to_rag steps for the same LanceDB); the loser rolls back the whole claim and returns None. Closes the residual race window that READ COMMITTED isolation left open between two claim transactions.
  • try_complete_run_group — atomic conditional UPDATE so GROUP_END fires exactly once across N workers. Replaces the TOCTOU window in the old get_run_group_stats snapshot read.
  • Self-skipping reaperreap_dead_workers(my_id, threshold) always excludes the caller, eliminating the self-reaping race where a stalled checkin loop would let a worker reset its own in-flight steps.

Orchestration layer (wf/runner.py)

  • class Worker — instance-scoped state; multiple workers can coexist in one process. Configurable per-type consumer pools: Worker(WorkerConfig(consumers={"parse": 4, "store": 8, "*": 2})).
  • Graceful shutdownWorker.stop(timeout) waits for in-flight steps, then cancels remaining consumers. Cancelled consumers call release_step so rows return to PENDING immediately rather than waiting WORKER_CHECKIN_TIMEOUT. The worker also deletes its own WorkerCheckin row so peers see the departure right away.
  • Event-driven LifecycleBus — fire-and-forget tasks so a slow STEP_START handler can never block step execution.
  • Pluggable Metrics protocol — counters (claim_attempts, claim_success, claim_idle, claim_lost_race, step_completed, step_error, step_failed, step_released, worker_reaped, lease_lost, resource_lock_swept) and histograms (claim_duration, step_duration). Default LoggingMetrics is a DEBUG no-op; production wires Prometheus.
  • Backoff on race-release — when _run_step race-releases on a resource lock (rare defense-in-depth path after atomic claim), the consumer applies idle-style exponential backoff so it doesn't spin-claim until the holder finishes. poll_backoff_max tuned from 5s → 3s to match typical LanceDB write durations.

Caller wiring

  • lib/rag.py — dropped the per-DB lock_map. Added resource_key_for(db_path) and hold_rag_lock(...) context manager for direct-from-Python writers. vacuum_db(db_name, sign, holder_kind, max_wait) wraps the operation in a ResourceLock; max_wait=0 for fail-fast.
  • lib/docling.py — module-level HTTP semaphore removed; parse concurrency is now expressed by the parse consumer pool size.
  • server/routes/lancedb.py — vacuum endpoint returns 200 / 404 / 409 (lock held by another writer) / 500. Routes through vacuum_db(holder_kind=WEB, max_wait=0).
  • si-diag lancedb vacuum--force flag calls force_release_resource_lock(...) (audit-logged) to break a stuck lock.
  • si-cli vacuum / vacuum-allholder_kind=CLI; vacuum-all skips DBs whose lock is currently held.
  • FastAPI lifespan + si-cli worker construct a Worker directly with {"parse": docling_concurrency, "store": worker_task_count, "*": worker_task_count} and call Worker.stop(30s) on shutdown.

Database migration

New migration a17c4d9b3e02_wf_refactor adds:

  • runstep.lease_token (nullable VARCHAR)
  • runstep.resource_key (nullable VARCHAR, ix_runstep_resource_key index)
  • resourcelock table + ix_resourcelock_expires_at index

A drop-in SQL upgrade script for environments where Alembic can't run in-place is committed at scripts/wf_refactor_upgrade.sql; the matching afsoc-rag init upgrade script is at afsoc-rag/docker/pgsql-init/upgrade_scripts/afsoc_ingester_002.sql.

Other improvements

  • Artifact compression (c439bd6) — new FILE_COMPRESSION_ARTIFACTS / FILE_COMPRESSION_LEVEL settings opt-in zstd compression for filesystem-backed artifact types.
  • Free disk space pre-check + I/O cleanup (f79b3df) — workflows refuse to start steps that would partially fill the disk; cleanup actions run on I/O errors to avoid partial writes.
  • Splitting throughput tweaks — perf-oriented adjustments to chunk/split settings.
  • Async haiku.rag (9a09980) — locked to >0.44; check_exists factored out of the rag client.
  • Param-set sort by id (c439bd6) — predictable ordering in CLI/diag outputs.
  • Dynamic /run/secrets handlingSettings only opts into the secrets directory if it actually exists, suppressing spurious warnings in non-container environments.
  • uv upgrade in the Dockerfile.

Documentation

  • docs/ARCHITECTURE.md — rewrote §3 Worker System and the execution-flow steps to cover typed pools, lease tokens, ResourceLock, graceful shutdown, metrics, and exactly-once group completion.
  • docs/WORKFLOWS.md — replaced "Worker Processing" with the atomic-claim flow; added Consumer Pools, Graceful Shutdown, Metrics, and resource-lock troubleshooting sections.
  • docs/DATABASE.mdlease_token and resource_key columns on RunStep, new ResourceLock table, ResourceLockKind enum, CANCELLED in RunStatus, indexes from the migration.
  • docs/API.md/lancedb/vacuum 404 / 409 / 500 response shapes and ResourceLock behavior.
  • docs/CLI.mdsi-diag lancedb subcommand docs (vacuum --force, vacuum-all, verify); updated si-cli vacuum to mention the cross-subsystem lock.
  • docs/CONFIGURATION.md — rewrote DOCLING_CONCURRENCY / WORKER_TASK_COUNT / INGEST_WORKER_CONCURRENCY for the new consumer-pool semantics; added FILE_COMPRESSION_* settings.

Tests

  • tests/unit/test_runner.py — rewritten against the new pure helpers (transition_allowed, elevate_terminal).
  • tests/unit/test_runner_internals.py (new, +1487 lines) — lifts runner.py coverage from 26% to 100%. Covers LifecycleBus, Worker construction/start/stop/cancellation, consumer-loop backoff (including race-release), heartbeat / reaper / lock-sweeper loops, default lifecycle hooks, module-level shim parity.
  • tests/unit/test_wf_claim.py (new, +586 lines) — claim_next_step priority/skip-lock/resource-key behavior; atomic lock insert on claim; acquire_resource_lock idempotence for the same holder; lease-gated terminal writes; reaper self-skip and orphan reset; end-to-end stale-worker block; recompute_run_status and try_complete_run_group semantics; _rag_resource_key_for_param stamping at workflow-run creation.
  • tests/unit/test_commit_rollback_paths.py (new, +1066 lines) — exhaustive transaction-handling coverage.
  • tests/unit/test_lancedb_routes.py — vacuum endpoint mocks the new rag.vacuum_db entry point and asserts 404 / 409 paths.

741 unit tests pass; full branch coverage on runner.py; suite-wide coverage up from 50% min → 72%.

Operator notes

  • Migration required: existing deployments must apply a17c4d9b3e02_wf_refactor (or the SQL drop-in) before starting the new worker. Running the new worker against the old schema will error on the missing lease_token / resource_key columns and resourcelock table.
  • DOCLING_CONCURRENCY semantics changed: was an in-process HTTP semaphore; is now the size of the dedicated parse consumer pool. Behavior is equivalent for the default parse workload, but operators who relied on the global cap (INGEST_WORKER_CONCURRENCY) should review their pool sizing.
  • Vacuum coordination: workflow save_to_rag steps, web vacuum, CLI vacuum, and lifecycle vacuum now coordinate via ResourceLock. The web endpoint is fail-fast (409); the CLI defaults to wait-forever; si-diag lancedb vacuum --force is the escape hatch for a genuinely stuck lock.

Test plan

  • Run a multi-document batch ingestion targeting a single LanceDB; verify save_to_rag steps serialize (no concurrent writers, no race-release log spam)
  • Run a batch targeting two distinct LanceDBs; verify they process in parallel
  • Restart the worker mid-batch; verify in-flight steps return to PENDING within seconds (not after the checkin timeout)
  • [ x Hit /api/v1/lancedb/vacuum while a save_to_rag step is in flight; expect 409
  • Run si-diag lancedb vacuum <db> --force on a healthy DB; expect audit log + successful vacuum
  • Confirm unit suite passes locally (uv run pytest tests/unit)

bd-mkt and others added 18 commits April 29, 2026 13:12
…, 5, 7)

Adds the persistence-layer foundation for the Worker refactor:

* New columns on RunStep:
  - `lease_token` — per-claim audit token. Set on claim, cleared on
    terminal status. complete_step / error_step / release_step are
    gated on lease match so a worker that gets reaped between claim
    and write cannot finalize a step on top of a fresh claimant.
  - `resource_key` — declarative cross-subsystem lock key (e.g.
    `rag:/abs/path/to/db`). Stamped on STORE-type steps at run-
    creation time.

* New `resourcelock` table (`ResourceLock`, `ResourceLockKind`).
  Single rendezvous point for everyone that writes a LanceDB —
  workflow `save_to_rag` steps, web vacuum endpoint, si-diag
  vacuum, end_group lifecycle vacuum. TTL-based with sweep.

* operations.py additions (Phase 1, 2, 5, 7):
  - `claim_next_step(worker_id, lease_token, allowed_types, batch_id)`
    — atomic UPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED. Replaces
    `get_runnable_steps + set_step_status(RUNNING)`. The eligibility
    predicate also skips steps whose `resource_key` is currently
    held by a non-expired lock, so a worker never claims a step
    whose RAG-DB is being vacuumed by another holder.
  - `complete_step` / `error_step` / `release_step` — lease-gated
    terminal/release writes. Drop the step's resource lock in the
    same transaction.
  - `recompute_run_status` (Phase 7) — idempotent run-status
    recompute, decoupled from the step transition.
  - `try_complete_run_group` (Phase 6 prep) — atomic conditional
    update so GROUP_END fires exactly-once across workers.
  - `worker_heartbeat` / `delete_worker_checkin` /
    `reap_dead_workers(my_id, threshold)` — reaper now skips the
    caller, eliminating the self-reaping race; reaped workers'
    resource locks are cleared alongside.
  - Resource lock CRUD: `acquire_resource_lock`,
    `refresh_resource_lock`, `release_resource_lock`,
    `force_release_resource_lock`, `sweep_expired_resource_locks`.

* `_rag_resource_key_for_param` helper resolves the RAG-DB
  resource_key from a param set's store config, used at
  workflow-run creation to stamp `resource_key` on STORE steps.

* Resolves the `reset_failed` merge conflict by adopting the
  soft-status (FAILED + CANCELLED) variant and adding the
  `_SOFT_STEP_STATUSES` constant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… event-driven lifecycle, metrics (phases 3, 4, 6, 8)

Replaces the module-global runner with a class-based orchestrator:

* `class Worker` — instance-scoped state. No module globals; two
  workers can coexist in one process for tests. Configurable per-
  type consumer pools:
    Worker(WorkerConfig(consumers={"parse": 4, "store": 8, "*": 2}))
  Each consumer claims via `operations.claim_next_step(allowed_types
  =[...])`, so per-type concurrency is bounded at the DB level
  without any in-process semaphore — and we never claim a step
  only to release it because of a local rate limit.

* Phase 4 — graceful shutdown. `Worker.stop(timeout)` signals
  consumers to stop claiming, waits for in-flight work, then
  cancels remaining consumers. Each consumer's CancelledError
  handler calls `operations.release_step(step_id, lease)` which
  resets the row to PENDING (gated on the lease so a stale
  finisher can't undo it). The worker also deletes its own
  checkin row so peers don't wait the full checkin timeout to
  notice it left. Restart-recovery latency drops from
  worker_checkin_timeout to "in-flight step duration" or
  "shutdown timeout," whichever is shorter.

* Phase 6 — event-driven lifecycle. `LifecycleBus` runs hooks on
  fire-and-forget tasks so a slow STEP_START handler can never
  block step execution. GROUP_END fires from a coordinator that
  subscribes to step-end events and consults
  `operations.try_complete_run_group(group_id)` — the SQL-side
  atomic conditional update guarantees exactly-once group-end
  even under N concurrent workers, replacing the TOCTOU window
  in the old `get_run_group_stats` snapshot read.

* Phase 8 — observability. Pluggable `Metrics` protocol with a
  default `LoggingMetrics` no-op. Counters: claim_attempts,
  claim_success, claim_idle, claim_error, claim_lost_race,
  step_completed, step_error, step_failed, step_released,
  worker_reaped, lease_lost, resource_lock_swept. Histograms:
  claim_duration, step_duration.

* Pure helpers `transition_allowed` / `elevate_terminal` extracted
  from the old `do_state_transition`. Worker-id ownership checks
  are now enforced at the SQL layer via the lease token, so the
  in-memory layer is just a rules table plus terminal elevation.

* Module-level `start_worker` / `stop_worker` / `get_worker_id`
  shims preserved (thin wrappers over a single private Worker)
  so the FastAPI lifespan and `si-cli worker` callers can update
  out-of-band; everything new should construct a Worker directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…s locks

Cleans up the module-level concurrency primitives that were process-
local and unsafe under multiprocessing, and points every RAG-DB
writer at the new cross-subsystem ResourceLock.

* `lib/rag.py`:
  - Drop the per-DB `lock_map` (defaultdict of asyncio.Lock). Per-DB
    serialization is now enforced by the workflow's claim layer via
    `resource_key` and by `hold_rag_lock(...)` for direct callers.
  - Drop the lock around `check_rag_existence` — LanceDB read-only
    access is concurrent-safe.
  - Add `resource_key_for(db_path)` helper and `hold_rag_lock(...)`
    context manager that any direct-from-Python writer (CLI vacuum,
    lifecycle vacuum) can use to acquire the cross-subsystem lock.
  - `vacuum_db(db_name, sign, holder_kind, max_wait)` now wraps the
    operation in a ResourceLock; `max_wait=0` for fail-fast.

* `lib/docling.py`: remove the module-level `_http_sem`. `parse`
  step concurrency is now expressed by `Worker(consumers={"parse":
  N, ...})`. Operators preserving the old default should set
  `consumers["parse"]` to `settings.docling_concurrency`.

* `server/routes/lancedb.py` vacuum endpoint: routes through
  `vacuum_db(holder_kind=WEB, max_wait=0)`. New status codes:
  - 200 ok
  - 404 not_found
  - 409 locked (ResourceLock held by another writer)
  - 500 error (anything else)

* `si-diag lancedb vacuum`: holder_kind=CLI, fail-fast on lock
  conflict with a clear "pass --force to break the lock" hint;
  `--force` calls `force_release_resource_lock(...)` (audit
  logged) before retrying.

* `si-cli vacuum` / `si-cli vacuum-all`: holder_kind=CLI; vacuum-all
  iterates and skips DBs whose lock is currently held.

* FastAPI lifespan + `si-cli worker`: construct a `Worker` directly
  with a per-step-type consumer layout
  (`{"parse": docling_concurrency, "store": worker_task_count, "*":
  worker_task_count}`) instead of calling the old start_worker
  shim. Lifespan also runs `Worker.stop(30s)` on shutdown for
  graceful release.

* `wf/operations.create_workflow_run`: stamps `resource_key` on
  STORE-type steps using the param set's resolved store path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…/lease/lock APIs

* `test_runner.py`: rewritten against `transition_allowed` /
  `elevate_terminal` (pure rules table + ERROR→FAILED elevation).
  The legacy worker-id ownership check is gone — that invariant is
  now enforced at the SQL layer via lease tokens, so the in-memory
  layer doesn't need to know about it.

* `test_wf_claim.py` (new): covers
  - `claim_next_step` returns the highest-priority eligible step
  - claims skip steps whose `resource_key` is held by another
    holder
  - two workers never claim the same step
  - `complete_step` is gated on lease token; wrong lease is a no-op
  - `error_step` increments retry and elevates to FAILED at the
    threshold
  - `release_step` returns to PENDING and is re-claimable
  - `reap_dead_workers` skips the caller (regression test for the
    self-reaping race)
  - `reap_dead_workers` resets orphaned RUNNING steps and clears
    their lease tokens
  - end-to-end: a stale worker that wakes up after being reaped
    cannot complete its old step on top of a fresh claimant
  - ResourceLock acquire / refresh / release / sweep semantics
  - `recompute_run_status` (FAILED dominates; all-COMPLETED →
    COMPLETED)
  - `try_complete_run_group` returns FAILED when any step failed,
    None when work remains, and is idempotent

* `test_lancedb_routes.py::TestVacuum`: rewritten to mock the new
  `rag.vacuum_db` entry point. Adds `test_vacuum_locked` (409 from
  TimeoutError) and `test_vacuum_not_found` (404 from
  FileNotFoundError) cases.

Full unit suite: 568 passed, 72% coverage (up from 50% min).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds ``test_runner_internals.py``, lifting runner.py coverage from
26% to 100% (399 stmts, 102 branches, 0 partial). The file is
companion to ``test_runner.py`` (which tests the pure state-
machine helpers) and exercises the rest of the runner module by
mocking ``operations`` so each test stays focused.

Coverage breakdown:

* ``LoggingMetrics``: incr / observe at DEBUG.
* ``LifecycleEventEnvelope``: frozen-dataclass invariant.
* ``LifecycleBus``: subscribe + publish fan-out, multiple events,
  drain-with-no-pending fast path, subscriber exception swallowing.
* ``WorkerConfig``: defaults and required-field handling.
* ``Worker``: construction (defaults, empty consumers, custom
  metrics), id / lifecycle accessors, distinct ids across instances,
  ``_allowed_types_for`` partitioning (named pool, ``"*"`` with no
  named pools → None, ``"*"`` excluding named types), ``start`` is
  idempotent, ``stop`` when never started is a no-op, ``stop``
  drains tasks and clears state, ``stop`` swallows
  ``delete_worker_checkin`` errors, ``stop`` waits for in-flight
  steps to drain.
* ``Worker._run_step`` paths: success → STEP_START / ITEM_START /
  STEP_END; last step → ITEM_END; resource-key acquire success;
  resource-lock-race → release_step + return; lease lost on
  complete → metric + skip recompute; handler raises with retries
  left → STEP_END; handler raises retries-exhausted → STEP_FAILED;
  error_step returns None (lease lost) → metric + skip recompute;
  error_step itself raises → logged not propagated; CancelledError
  → release_step + reraise; release_step returns False → no
  step_released metric; release_step itself raises → logged but
  reraised; first DB lookup raises → no lifecycle publish (all
  context Nones).
* Background loops: each loop's success path, body-error logged-
  and-continued path, cancellation propagation, and the "interval
  elapsed without stop" branch (uses an ``asyncio.wait_for`` patch
  that disambiguates the loops' ``stop_event.wait()`` from the
  test wrapper's coroutine).
* Default lifecycle hooks: GROUP_END fires when
  ``try_complete_run_group`` returns a status; no-op when run group
  is still running; no-op for non-terminating events;
  ``_install_default_lifecycle_hooks`` is idempotent.
* ``_run_workflow_lifecycle_handlers``: lifecycle_events None,
  event not subscribed, success path records COMPLETED, handler
  raises records FAILED, history-update failure swallowed and
  logged, non-dict handler return wrapped in ``{"result": ...}``.
* ``build_coro`` / ``build_step_coro``: dispatch via step_type,
  extra_args override run_params, handler.parameters supply
  unbound args, batch=None branch.
* Module-level shims: get_worker_id when no default worker;
  start_worker(create_tasks=False) skips loops; start_worker is
  idempotent; start_worker(create_tasks=True) schedules loops;
  stop_worker when no default; stop_worker clears default;
  get_runnable_steps empty / default-top branches.
* ``do_state_transition`` legacy wrapper: confirms step_worker_id
  kwarg is no longer consulted (ownership moved to SQL lease
  layer).
* ``runner.datetime`` re-export survives.

Two helpers ``_patch_op`` and ``_patch_runtime_lookups`` keep the
``patch("soliplex.ingester.lib.wf.runner.operations.X", ...)``
boilerplate concise and within the project's 126-column line
limit. ``_reset_default_worker`` autouse fixture isolates the
module-level shim's ``_default_worker`` between tests.

Total: 91 new tests, runner.py 100% line + branch coverage.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
fix: use dynamic configuration to avoid unnecessary warnings about missing /run/secrets
feat: add pre-check for free disk space to avoid partial writes
feat: add clean-up actions in the event of io errors to avoid partial writes
perf: tweak splitting settings for better throughput
fix: sort param sets by id
chore: update uv in Dockerfile
tests: flag blocks no cover if impractical
fix: change to async behavior in hr
fix: factor out rag client for check_exists
fix race in lancedb locking
@bd-mkt bd-mkt merged commit 39e1d7f into main May 18, 2026
8 checks passed
@bd-mkt bd-mkt deleted the wf_refactor branch May 18, 2026 18:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant