Wf refactor#108
Merged
Merged
Conversation
…, 5, 7)
Adds the persistence-layer foundation for the Worker refactor:
* New columns on RunStep:
- `lease_token` — per-claim audit token. Set on claim, cleared on
terminal status. complete_step / error_step / release_step are
gated on lease match so a worker that gets reaped between claim
and write cannot finalize a step on top of a fresh claimant.
- `resource_key` — declarative cross-subsystem lock key (e.g.
`rag:/abs/path/to/db`). Stamped on STORE-type steps at run-
creation time.
* New `resourcelock` table (`ResourceLock`, `ResourceLockKind`).
Single rendezvous point for everyone that writes a LanceDB —
workflow `save_to_rag` steps, web vacuum endpoint, si-diag
vacuum, end_group lifecycle vacuum. TTL-based with sweep.
* operations.py additions (Phase 1, 2, 5, 7):
- `claim_next_step(worker_id, lease_token, allowed_types, batch_id)`
— atomic UPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED. Replaces
`get_runnable_steps + set_step_status(RUNNING)`. The eligibility
predicate also skips steps whose `resource_key` is currently
held by a non-expired lock, so a worker never claims a step
whose RAG-DB is being vacuumed by another holder.
- `complete_step` / `error_step` / `release_step` — lease-gated
terminal/release writes. Drop the step's resource lock in the
same transaction.
- `recompute_run_status` (Phase 7) — idempotent run-status
recompute, decoupled from the step transition.
- `try_complete_run_group` (Phase 6 prep) — atomic conditional
update so GROUP_END fires exactly-once across workers.
- `worker_heartbeat` / `delete_worker_checkin` /
`reap_dead_workers(my_id, threshold)` — reaper now skips the
caller, eliminating the self-reaping race; reaped workers'
resource locks are cleared alongside.
- Resource lock CRUD: `acquire_resource_lock`,
`refresh_resource_lock`, `release_resource_lock`,
`force_release_resource_lock`, `sweep_expired_resource_locks`.
* `_rag_resource_key_for_param` helper resolves the RAG-DB
resource_key from a param set's store config, used at
workflow-run creation to stamp `resource_key` on STORE steps.
* Resolves the `reset_failed` merge conflict by adopting the
soft-status (FAILED + CANCELLED) variant and adding the
`_SOFT_STEP_STATUSES` constant.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… event-driven lifecycle, metrics (phases 3, 4, 6, 8)
Replaces the module-global runner with a class-based orchestrator:
* `class Worker` — instance-scoped state. No module globals; two
workers can coexist in one process for tests. Configurable per-
type consumer pools:
Worker(WorkerConfig(consumers={"parse": 4, "store": 8, "*": 2}))
Each consumer claims via `operations.claim_next_step(allowed_types
=[...])`, so per-type concurrency is bounded at the DB level
without any in-process semaphore — and we never claim a step
only to release it because of a local rate limit.
* Phase 4 — graceful shutdown. `Worker.stop(timeout)` signals
consumers to stop claiming, waits for in-flight work, then
cancels remaining consumers. Each consumer's CancelledError
handler calls `operations.release_step(step_id, lease)` which
resets the row to PENDING (gated on the lease so a stale
finisher can't undo it). The worker also deletes its own
checkin row so peers don't wait the full checkin timeout to
notice it left. Restart-recovery latency drops from
worker_checkin_timeout to "in-flight step duration" or
"shutdown timeout," whichever is shorter.
* Phase 6 — event-driven lifecycle. `LifecycleBus` runs hooks on
fire-and-forget tasks so a slow STEP_START handler can never
block step execution. GROUP_END fires from a coordinator that
subscribes to step-end events and consults
`operations.try_complete_run_group(group_id)` — the SQL-side
atomic conditional update guarantees exactly-once group-end
even under N concurrent workers, replacing the TOCTOU window
in the old `get_run_group_stats` snapshot read.
* Phase 8 — observability. Pluggable `Metrics` protocol with a
default `LoggingMetrics` no-op. Counters: claim_attempts,
claim_success, claim_idle, claim_error, claim_lost_race,
step_completed, step_error, step_failed, step_released,
worker_reaped, lease_lost, resource_lock_swept. Histograms:
claim_duration, step_duration.
* Pure helpers `transition_allowed` / `elevate_terminal` extracted
from the old `do_state_transition`. Worker-id ownership checks
are now enforced at the SQL layer via the lease token, so the
in-memory layer is just a rules table plus terminal elevation.
* Module-level `start_worker` / `stop_worker` / `get_worker_id`
shims preserved (thin wrappers over a single private Worker)
so the FastAPI lifespan and `si-cli worker` callers can update
out-of-band; everything new should construct a Worker directly.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…s locks
Cleans up the module-level concurrency primitives that were process-
local and unsafe under multiprocessing, and points every RAG-DB
writer at the new cross-subsystem ResourceLock.
* `lib/rag.py`:
- Drop the per-DB `lock_map` (defaultdict of asyncio.Lock). Per-DB
serialization is now enforced by the workflow's claim layer via
`resource_key` and by `hold_rag_lock(...)` for direct callers.
- Drop the lock around `check_rag_existence` — LanceDB read-only
access is concurrent-safe.
- Add `resource_key_for(db_path)` helper and `hold_rag_lock(...)`
context manager that any direct-from-Python writer (CLI vacuum,
lifecycle vacuum) can use to acquire the cross-subsystem lock.
- `vacuum_db(db_name, sign, holder_kind, max_wait)` now wraps the
operation in a ResourceLock; `max_wait=0` for fail-fast.
* `lib/docling.py`: remove the module-level `_http_sem`. `parse`
step concurrency is now expressed by `Worker(consumers={"parse":
N, ...})`. Operators preserving the old default should set
`consumers["parse"]` to `settings.docling_concurrency`.
* `server/routes/lancedb.py` vacuum endpoint: routes through
`vacuum_db(holder_kind=WEB, max_wait=0)`. New status codes:
- 200 ok
- 404 not_found
- 409 locked (ResourceLock held by another writer)
- 500 error (anything else)
* `si-diag lancedb vacuum`: holder_kind=CLI, fail-fast on lock
conflict with a clear "pass --force to break the lock" hint;
`--force` calls `force_release_resource_lock(...)` (audit
logged) before retrying.
* `si-cli vacuum` / `si-cli vacuum-all`: holder_kind=CLI; vacuum-all
iterates and skips DBs whose lock is currently held.
* FastAPI lifespan + `si-cli worker`: construct a `Worker` directly
with a per-step-type consumer layout
(`{"parse": docling_concurrency, "store": worker_task_count, "*":
worker_task_count}`) instead of calling the old start_worker
shim. Lifespan also runs `Worker.stop(30s)` on shutdown for
graceful release.
* `wf/operations.create_workflow_run`: stamps `resource_key` on
STORE-type steps using the param set's resolved store path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…/lease/lock APIs
* `test_runner.py`: rewritten against `transition_allowed` /
`elevate_terminal` (pure rules table + ERROR→FAILED elevation).
The legacy worker-id ownership check is gone — that invariant is
now enforced at the SQL layer via lease tokens, so the in-memory
layer doesn't need to know about it.
* `test_wf_claim.py` (new): covers
- `claim_next_step` returns the highest-priority eligible step
- claims skip steps whose `resource_key` is held by another
holder
- two workers never claim the same step
- `complete_step` is gated on lease token; wrong lease is a no-op
- `error_step` increments retry and elevates to FAILED at the
threshold
- `release_step` returns to PENDING and is re-claimable
- `reap_dead_workers` skips the caller (regression test for the
self-reaping race)
- `reap_dead_workers` resets orphaned RUNNING steps and clears
their lease tokens
- end-to-end: a stale worker that wakes up after being reaped
cannot complete its old step on top of a fresh claimant
- ResourceLock acquire / refresh / release / sweep semantics
- `recompute_run_status` (FAILED dominates; all-COMPLETED →
COMPLETED)
- `try_complete_run_group` returns FAILED when any step failed,
None when work remains, and is idempotent
* `test_lancedb_routes.py::TestVacuum`: rewritten to mock the new
`rag.vacuum_db` entry point. Adds `test_vacuum_locked` (409 from
TimeoutError) and `test_vacuum_not_found` (404 from
FileNotFoundError) cases.
Full unit suite: 568 passed, 72% coverage (up from 50% min).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds ``test_runner_internals.py``, lifting runner.py coverage from
26% to 100% (399 stmts, 102 branches, 0 partial). The file is
companion to ``test_runner.py`` (which tests the pure state-
machine helpers) and exercises the rest of the runner module by
mocking ``operations`` so each test stays focused.
Coverage breakdown:
* ``LoggingMetrics``: incr / observe at DEBUG.
* ``LifecycleEventEnvelope``: frozen-dataclass invariant.
* ``LifecycleBus``: subscribe + publish fan-out, multiple events,
drain-with-no-pending fast path, subscriber exception swallowing.
* ``WorkerConfig``: defaults and required-field handling.
* ``Worker``: construction (defaults, empty consumers, custom
metrics), id / lifecycle accessors, distinct ids across instances,
``_allowed_types_for`` partitioning (named pool, ``"*"`` with no
named pools → None, ``"*"`` excluding named types), ``start`` is
idempotent, ``stop`` when never started is a no-op, ``stop``
drains tasks and clears state, ``stop`` swallows
``delete_worker_checkin`` errors, ``stop`` waits for in-flight
steps to drain.
* ``Worker._run_step`` paths: success → STEP_START / ITEM_START /
STEP_END; last step → ITEM_END; resource-key acquire success;
resource-lock-race → release_step + return; lease lost on
complete → metric + skip recompute; handler raises with retries
left → STEP_END; handler raises retries-exhausted → STEP_FAILED;
error_step returns None (lease lost) → metric + skip recompute;
error_step itself raises → logged not propagated; CancelledError
→ release_step + reraise; release_step returns False → no
step_released metric; release_step itself raises → logged but
reraised; first DB lookup raises → no lifecycle publish (all
context Nones).
* Background loops: each loop's success path, body-error logged-
and-continued path, cancellation propagation, and the "interval
elapsed without stop" branch (uses an ``asyncio.wait_for`` patch
that disambiguates the loops' ``stop_event.wait()`` from the
test wrapper's coroutine).
* Default lifecycle hooks: GROUP_END fires when
``try_complete_run_group`` returns a status; no-op when run group
is still running; no-op for non-terminating events;
``_install_default_lifecycle_hooks`` is idempotent.
* ``_run_workflow_lifecycle_handlers``: lifecycle_events None,
event not subscribed, success path records COMPLETED, handler
raises records FAILED, history-update failure swallowed and
logged, non-dict handler return wrapped in ``{"result": ...}``.
* ``build_coro`` / ``build_step_coro``: dispatch via step_type,
extra_args override run_params, handler.parameters supply
unbound args, batch=None branch.
* Module-level shims: get_worker_id when no default worker;
start_worker(create_tasks=False) skips loops; start_worker is
idempotent; start_worker(create_tasks=True) schedules loops;
stop_worker when no default; stop_worker clears default;
get_runnable_steps empty / default-top branches.
* ``do_state_transition`` legacy wrapper: confirms step_worker_id
kwarg is no longer consulted (ownership moved to SQL lease
layer).
* ``runner.datetime`` re-export survives.
Two helpers ``_patch_op`` and ``_patch_runtime_lookups`` keep the
``patch("soliplex.ingester.lib.wf.runner.operations.X", ...)``
boilerplate concise and within the project's 126-column line
limit. ``_reset_default_worker`` autouse fixture isolates the
module-level shim's ``_default_worker`` between tests.
Total: 91 new tests, runner.py 100% line + branch coverage.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
fix: use dynamic configuration to avoid unnecessary warnings about missing /run/secrets feat: add pre-check for free disk space to avoid partial writes feat: add clean-up actions in the event of io errors to avoid partial writes perf: tweak splitting settings for better throughput
fix: sort param sets by id chore: update uv in Dockerfile
tests: flag blocks no cover if impractical
fix: change to async behavior in hr fix: factor out rag client for check_exists
fix race in lancedb locking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Workflow engine refactor: class-based Worker, atomic claim + lease tokens, ResourceLock
Summary
This branch is primarily a foundational refactor of the workflow execution engine: it replaces a module-global runner with a class-based
Workerthat owns typed consumer pools, lease-gated terminal writes, and a cross-subsystemResourceLocktable. The new persistence layer eliminates several long-standing concurrency bugs in the original runner (process-local locking, TOCTOU on group completion, no protection against stale workers double-finalizing steps). A handful of smaller fixes ride along: compression for filesystem artifacts, a free-disk-space pre-check, async haiku.rag wiring, docling refactor, and broad documentation updates.Workflow engine refactor
Implemented across phases 1–8 (see commits
f0728ab,545a1a7,44631cd) plus a final atomic-claim race fix.Persistence layer (
wf/operations.py)claim_next_step(worker_id, lease_token, allowed_types=…)replacesget_runnable_steps + set_step_status(RUNNING)with a singleUPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED. Per-type concurrency is bounded at the SQL layer, eliminating in-process semaphores.runstep.lease_token.complete_step/error_step/release_stepare gated on the lease, so a worker reaped between claim and write can never bounce a fresh claimant.ResourceLocktable — cross-subsystem rendezvous keyed byresource_key(typicallyrag:<abs-db-path>). All RAG-DB writers (workflowsave_to_ragsteps, web vacuum endpoint,si-diagCLI,end_grouplifecycle vacuums) coordinate here. TTL-based; expired rows are swept by a background loop.resource_key, the matchingResourceLockrow isINSERTed in the same transaction as the claim. The unique PK constraint serializes concurrent claims targeting the same key (e.g. multiplesave_to_ragsteps for the same LanceDB); the loser rolls back the whole claim and returnsNone. Closes the residual race window that READ COMMITTED isolation left open between two claim transactions.try_complete_run_group— atomic conditionalUPDATEsoGROUP_ENDfires exactly once across N workers. Replaces the TOCTOU window in the oldget_run_group_statssnapshot read.reap_dead_workers(my_id, threshold)always excludes the caller, eliminating the self-reaping race where a stalled checkin loop would let a worker reset its own in-flight steps.Orchestration layer (
wf/runner.py)class Worker— instance-scoped state; multiple workers can coexist in one process. Configurable per-type consumer pools:Worker(WorkerConfig(consumers={"parse": 4, "store": 8, "*": 2})).Worker.stop(timeout)waits for in-flight steps, then cancels remaining consumers. Cancelled consumers callrelease_stepso rows return toPENDINGimmediately rather than waitingWORKER_CHECKIN_TIMEOUT. The worker also deletes its ownWorkerCheckinrow so peers see the departure right away.LifecycleBus— fire-and-forget tasks so a slowSTEP_STARThandler can never block step execution.Metricsprotocol — counters (claim_attempts,claim_success,claim_idle,claim_lost_race,step_completed,step_error,step_failed,step_released,worker_reaped,lease_lost,resource_lock_swept) and histograms (claim_duration,step_duration). DefaultLoggingMetricsis a DEBUG no-op; production wires Prometheus._run_steprace-releases on a resource lock (rare defense-in-depth path after atomic claim), the consumer applies idle-style exponential backoff so it doesn't spin-claim until the holder finishes.poll_backoff_maxtuned from 5s → 3s to match typical LanceDB write durations.Caller wiring
lib/rag.py— dropped the per-DBlock_map. Addedresource_key_for(db_path)andhold_rag_lock(...)context manager for direct-from-Python writers.vacuum_db(db_name, sign, holder_kind, max_wait)wraps the operation in aResourceLock;max_wait=0for fail-fast.lib/docling.py— module-level HTTP semaphore removed; parse concurrency is now expressed by theparseconsumer pool size.server/routes/lancedb.py— vacuum endpoint returns 200 / 404 / 409 (lock held by another writer) / 500. Routes throughvacuum_db(holder_kind=WEB, max_wait=0).si-diag lancedb vacuum—--forceflag callsforce_release_resource_lock(...)(audit-logged) to break a stuck lock.si-cli vacuum/vacuum-all—holder_kind=CLI;vacuum-allskips DBs whose lock is currently held.si-cli workerconstruct aWorkerdirectly with{"parse": docling_concurrency, "store": worker_task_count, "*": worker_task_count}and callWorker.stop(30s)on shutdown.Database migration
New migration
a17c4d9b3e02_wf_refactoradds:runstep.lease_token(nullableVARCHAR)runstep.resource_key(nullableVARCHAR,ix_runstep_resource_keyindex)resourcelocktable +ix_resourcelock_expires_atindexA drop-in SQL upgrade script for environments where Alembic can't run in-place is committed at
scripts/wf_refactor_upgrade.sql; the matching afsoc-rag init upgrade script is atafsoc-rag/docker/pgsql-init/upgrade_scripts/afsoc_ingester_002.sql.Other improvements
c439bd6) — newFILE_COMPRESSION_ARTIFACTS/FILE_COMPRESSION_LEVELsettings opt-in zstd compression for filesystem-backed artifact types.f79b3df) — workflows refuse to start steps that would partially fill the disk; cleanup actions run on I/O errors to avoid partial writes.9a09980) — locked to>0.44;check_existsfactored out of the rag client.c439bd6) — predictable ordering in CLI/diag outputs./run/secretshandling —Settingsonly opts into the secrets directory if it actually exists, suppressing spurious warnings in non-container environments.uvupgrade in the Dockerfile.Documentation
docs/ARCHITECTURE.md— rewrote §3 Worker System and the execution-flow steps to cover typed pools, lease tokens,ResourceLock, graceful shutdown, metrics, and exactly-once group completion.docs/WORKFLOWS.md— replaced "Worker Processing" with the atomic-claim flow; added Consumer Pools, Graceful Shutdown, Metrics, and resource-lock troubleshooting sections.docs/DATABASE.md—lease_tokenandresource_keycolumns onRunStep, newResourceLocktable,ResourceLockKindenum,CANCELLEDinRunStatus, indexes from the migration.docs/API.md—/lancedb/vacuum404 / 409 / 500 response shapes andResourceLockbehavior.docs/CLI.md—si-diag lancedbsubcommand docs (vacuum --force,vacuum-all,verify); updatedsi-cli vacuumto mention the cross-subsystem lock.docs/CONFIGURATION.md— rewroteDOCLING_CONCURRENCY/WORKER_TASK_COUNT/INGEST_WORKER_CONCURRENCYfor the new consumer-pool semantics; addedFILE_COMPRESSION_*settings.Tests
tests/unit/test_runner.py— rewritten against the new pure helpers (transition_allowed,elevate_terminal).tests/unit/test_runner_internals.py(new, +1487 lines) — liftsrunner.pycoverage from 26% to 100%. CoversLifecycleBus,Workerconstruction/start/stop/cancellation, consumer-loop backoff (including race-release), heartbeat / reaper / lock-sweeper loops, default lifecycle hooks, module-level shim parity.tests/unit/test_wf_claim.py(new, +586 lines) —claim_next_steppriority/skip-lock/resource-key behavior; atomic lock insert on claim;acquire_resource_lockidempotence for the same holder; lease-gated terminal writes; reaper self-skip and orphan reset; end-to-end stale-worker block;recompute_run_statusandtry_complete_run_groupsemantics;_rag_resource_key_for_paramstamping at workflow-run creation.tests/unit/test_commit_rollback_paths.py(new, +1066 lines) — exhaustive transaction-handling coverage.tests/unit/test_lancedb_routes.py— vacuum endpoint mocks the newrag.vacuum_dbentry point and asserts 404 / 409 paths.741 unit tests pass; full branch coverage on
runner.py; suite-wide coverage up from 50% min → 72%.Operator notes
a17c4d9b3e02_wf_refactor(or the SQL drop-in) before starting the new worker. Running the new worker against the old schema will error on the missinglease_token/resource_keycolumns andresourcelocktable.DOCLING_CONCURRENCYsemantics changed: was an in-process HTTP semaphore; is now the size of the dedicatedparseconsumer pool. Behavior is equivalent for the defaultparseworkload, but operators who relied on the global cap (INGEST_WORKER_CONCURRENCY) should review their pool sizing.save_to_ragsteps, web vacuum, CLI vacuum, and lifecycle vacuum now coordinate viaResourceLock. The web endpoint is fail-fast (409); the CLI defaults to wait-forever;si-diag lancedb vacuum --forceis the escape hatch for a genuinely stuck lock.Test plan
/api/v1/lancedb/vacuumwhile a save_to_rag step is in flight; expect 409si-diag lancedb vacuum <db> --forceon a healthy DB; expect audit log + successful vacuumunitsuite passes locally (uv run pytest tests/unit)