Skip to content

Workflow CPU-contention & locking cleanup#110

Open
bd-mkt wants to merge 2 commits into
mainfrom
wf_contention_fix
Open

Workflow CPU-contention & locking cleanup#110
bd-mkt wants to merge 2 commits into
mainfrom
wf_contention_fix

Conversation

@bd-mkt
Copy link
Copy Markdown
Collaborator

@bd-mkt bd-mkt commented May 21, 2026

Summary

Under CPU contention (heavy parses, embedding loops, JSON serialization)
the workflow runner's locking layer was failing in a non-obvious way: a
single save_to_rag step that normally takes 10 s would extend to
~1000 s, with logs like lost resource lock ... during heartbeat right
before the slow completion. Root-cause analysis identified two
independent problems:

  1. The locking architecture had three TTL-racing actors (lock TTL +
    heartbeat refresh + periodic sweeper). When the event loop was
    starved by sync CPU work, the heartbeat would miss its refresh
    window, the sweeper would delete the lock row, and a sibling
    consumer could then claim a contending step — turning a single
    serialized writer into two concurrent writers fighting at the
    LanceDB layer.
  2. The CPU work that was doing the starving was avoidable. Every
    workflow step had at least one sync, pure-Python pass over a
    large pydantic / JSON / pypdf structure that blocked the event
    loop for tens of seconds at a time.

This PR addresses both, plus a handful of unrelated bugs in
docling.py that surfaced during the review (the file was in an
unbuildable intermediate state, with a duplicate function name
shadowing the public symbol every caller imports).

Changes

1. Workflow runner locking — simplified to fit single-process reality

Today this codebase typically runs one worker process; multi-process is
an aspirational future. The previous design optimized for the future at
the cost of the present.

  • In-process asyncio.Lock keyed by resource_key
    (runner.py). Two consumer
    coroutines targeting the same RAG-DB path now serialize on an
    in-memory mutex with zero DB I/O and no time-based semantics — so
    CPU starvation cannot widen a race window.
  • Dropped the TTL/heartbeat-refresh/sweeper machinery for
    WORKER-held ResourceLock rows.
    Worker rows now carry a
    far-future sentinel expires_at and are cleared by
    complete_step / error_step / release_step / reap_dead_workers
    in the same transaction as the step terminal write. Dead-worker
    recovery still flows through WorkerCheckin and the reaper.
  • CLI / web / lifecycle holders unchanged. Those callers go
    through hold_rag_lock / acquire_resource_lock with real TTLs
    — appropriate for short-lived holders without a heartbeat.
  • Removed the redundant acquire_resource_lock call in _run_step.
    The claim layer (claim_next_step) already inserts the row
    atomically with the status update; the post-claim acquire was a
    no-op refresh on every step.
  • WorkerConfig.resource_lock_ttl removed. No longer used.
  • Stopped 50+ lines of background-task code (_lock_sweeper_loop,
    the heartbeat's per-lock refresh inner loop, the "lost lock"
    warning log path that never aborted the step).

The cross-process correctness floor — atomic
ResourceLock insert + unique PK + subq_running_rk exclusion in
claim_next_step — is preserved unchanged, so a future second worker
sees the row immediately and skips contending steps.

2. CPU-heavy work moved off the event loop

Every workflow step that touched a large pydantic / JSON / pypdf
structure on the event loop is now wrapped in asyncio.to_thread.

In workflow.py, four new sync helpers were added next to read_file:

  • _chunks_from_bytes(bytes) -> list[Chunk] — JSON-decode + per-item
    Chunk.model_validate.
  • _chunks_to_bytes(chunks) -> bytes — per-item model_dump +
    JSON-encode.
  • _extract_pdf_metadata(bytes) -> dict — pypdf parse + page count +
    selected /Info keys.
  • _md5_hex(bytes) -> str — MD5 hex digest.

Call-site changes:

  • split_parse_pdf_filesmart_split_to_files,
    BatchProcessor.execute_parallel, per-chunk json.loads,
    merge_from_results, and final Docling serialization
    (export_to_markdown + model_dump_json) all run on worker
    threads.
  • validate_documentpypdf.PdfReader parse runs on a
    thread via _extract_pdf_metadata. Also fixes a pre-existing bug
    where /Subject was listed twice in the metadata loop.
  • chunk_documentDoclingDocument.model_validate_json and the
    chunk → JSON-bytes serialization run on threads.
  • embed_document — chunk deserialize and embed-chunk serialize
    both go through the helpers via to_thread.
  • save_to_rag — chunk deserialize and hashlib.md5 move
    off-loop.

The dominant per-step blocker (DoclingDocument.model_validate_json
on a multi-MB Docling document) no longer pins the loop, so
heartbeats, lifecycle subscribers, and sibling consumer polls stay
responsive while a parse is in flight.

3. docling.py rewrite

The file was in a broken intermediate state when the PR opened: a
shadowed function name meant the public docling_convert symbol
every caller imports didn't exist, and the semaphore that was
supposed to gate docling-serve requests was dead code.

  • Single public docling_convert with @retry outside and
    async with get_docling_sem() inside. Failing attempts release
    their slot during the exponential-backoff wait — fixes a
    potential deadlock where a backed-up docling server could pin
    every slot.
  • Post-response CPU work moved outside the semaphore. do_repl
    (recursive image-placeholder substitution) and the whole-tree
    json.dumps re-serialization no longer run while holding a
    docling-serve slot. They're split into a sync _process_result
    helper that callers offload to asyncio.to_thread.
  • Big-result response.json() runs on a worker thread. The
    Docling response body can be many MB; parsing it on the loop was
    blocking sibling coroutines for seconds.
  • is_html bound to a fixed window. Previously the <body
    check scanned the entire buffer — wasted up to 50 MB of
    bytes.find on every non-HTML payload. Bounded to the first
    8 KB; <html check stays at the original 100 bytes.
  • Hardcoded Semaphore(12) in split_parse_pdf_file removed.
    Process-wide gating now lives in one place (docling_convert)
    with one tunable (settings.docling_concurrency).

4. Test fixture for docling semaphore

asyncio.Semaphore binds to whichever event loop is current at
construction time, which trips per-test loops with
RuntimeError: ... bound to a different loop. Added an autouse
fixture in tests/conftest.py that resets
docling._docling_sem = None before and after each test.

Tests

  • 44 new unit tests added (820 total, up from 776):
    • tests/unit/test_workflow_helpers.py — 21 tests covering all
      four new sync helpers (roundtrip, empty input, duck-typed
      model_dump, in-process PDF generation for _extract_pdf_metadata,
      known MD5 values, etc.).
    • tests/unit/test_docling.py — 23 tests covering is_html (the
      new 8 KB bound, the preserved 100-byte <html window, the
      regression case where <body past 8 KB is no longer detected),
      do_repl, get_docling_sem (lazy singleton, respects settings),
      _process_result (success/failure, placeholder mode, both output
      formats).
    • tests/unit/test_runner_internals.py — 4 new tests for the
      in-process asyncio.Lock (_run_step doesn't call
      acquire_resource_lock, same resource_key serializes,
      different resource_keys run concurrently, no-key path skips
      the lock dict).
  • 10 obsolete tests removed: heartbeat lock refresh, lost-lock
    warning, sweeper-loop tests, race-release backoff, defensive
    acquire path. Net test count: +44 − 10 + 21 + 23 = +78 (the
    workflow_helpers and docling files are entirely new).
  • Coverage: 85.76% (was 85.31%). runner.py is at 98%.
  • Lint and format clean across all modified files.

Docs

ARCHITECTURE.md, DATABASE.md, and WORKFLOWS.md updated to
describe the new model: in-process serialization for workers, sentinel
expires_at on worker-held ResourceLock rows, removed
claim_lost_race / resource_lock_swept metrics, and the corrected
narrative around CLI/web/lifecycle holders continuing to use real TTLs.

Backwards compatibility

  • Schema: no migrations. ResourceLock.expires_at is the same
    column, just holds a far-future sentinel for WORKER rows. The
    index on expires_at is still useful for CLI/web/lifecycle
    sweeps.
  • API: docling.docling_convert now exists as the public symbol
    (it previously didn't, due to the rename bug, so this is strictly
    a fix). All other public signatures unchanged.
  • Settings: docling_concurrency is now actually honored as the
    process-wide gate. Previously the hardcoded Semaphore(12) in
    split_parse_pdf_file overrode it within a single PDF parse.
    Deployments that previously relied on the implicit 12 should
    reconsider their docling_concurrency value.

Operational notes

  • Single-process invariant: this PR optimizes for one worker
    process per docling-serve. The cross-process correctness floor is
    preserved for future scale-out, but the docling semaphore is
    still per-process — multiple workers against a shared
    docling-serve will see aggregate N × docling_concurrency
    parallelism at the server. Rate-limit at the proxy or in
    docling-serve's queue if you scale out.
  • Worker recovery latency for a process that dies mid-step is
    now bounded by worker_checkin_timeout (default 600 s) rather
    than resource_lock_ttl (was 300 s). Tune worker_checkin_timeout
    if 10 minutes is too long for your deployment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant