Workflow CPU-contention & locking cleanup#110
Open
bd-mkt wants to merge 2 commits into
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Under CPU contention (heavy parses, embedding loops, JSON serialization)
the workflow runner's locking layer was failing in a non-obvious way: a
single
save_to_ragstep that normally takes 10 s would extend to~1000 s, with logs like
lost resource lock ... during heartbeatrightbefore the slow completion. Root-cause analysis identified two
independent problems:
heartbeat refresh + periodic sweeper). When the event loop was
starved by sync CPU work, the heartbeat would miss its refresh
window, the sweeper would delete the lock row, and a sibling
consumer could then claim a contending step — turning a single
serialized writer into two concurrent writers fighting at the
LanceDB layer.
workflow step had at least one sync, pure-Python pass over a
large pydantic / JSON / pypdf structure that blocked the event
loop for tens of seconds at a time.
This PR addresses both, plus a handful of unrelated bugs in
docling.pythat surfaced during the review (the file was in anunbuildable intermediate state, with a duplicate function name
shadowing the public symbol every caller imports).
Changes
1. Workflow runner locking — simplified to fit single-process reality
Today this codebase typically runs one worker process; multi-process is
an aspirational future. The previous design optimized for the future at
the cost of the present.
asyncio.Lockkeyed byresource_key(runner.py). Two consumer
coroutines targeting the same RAG-DB path now serialize on an
in-memory mutex with zero DB I/O and no time-based semantics — so
CPU starvation cannot widen a race window.
WORKER-heldResourceLockrows. Worker rows now carry afar-future sentinel
expires_atand are cleared bycomplete_step/error_step/release_step/reap_dead_workersin the same transaction as the step terminal write. Dead-worker
recovery still flows through
WorkerCheckinand the reaper.through
hold_rag_lock/acquire_resource_lockwith real TTLs— appropriate for short-lived holders without a heartbeat.
acquire_resource_lockcall in_run_step.The claim layer (
claim_next_step) already inserts the rowatomically with the status update; the post-claim acquire was a
no-op refresh on every step.
WorkerConfig.resource_lock_ttlremoved. No longer used._lock_sweeper_loop,the heartbeat's per-lock refresh inner loop, the "lost lock"
warning log path that never aborted the step).
The cross-process correctness floor — atomic
ResourceLockinsert + unique PK +subq_running_rkexclusion inclaim_next_step— is preserved unchanged, so a future second workersees the row immediately and skips contending steps.
2. CPU-heavy work moved off the event loop
Every workflow step that touched a large pydantic / JSON / pypdf
structure on the event loop is now wrapped in
asyncio.to_thread.In
workflow.py, four new sync helpers were added next toread_file:_chunks_from_bytes(bytes) -> list[Chunk]— JSON-decode + per-itemChunk.model_validate._chunks_to_bytes(chunks) -> bytes— per-itemmodel_dump+JSON-encode.
_extract_pdf_metadata(bytes) -> dict— pypdf parse + page count +selected
/Infokeys._md5_hex(bytes) -> str— MD5 hex digest.Call-site changes:
split_parse_pdf_file—smart_split_to_files,BatchProcessor.execute_parallel, per-chunkjson.loads,merge_from_results, and final Docling serialization(
export_to_markdown+model_dump_json) all run on workerthreads.
validate_document—pypdf.PdfReaderparse runs on athread via
_extract_pdf_metadata. Also fixes a pre-existing bugwhere
/Subjectwas listed twice in the metadata loop.chunk_document—DoclingDocument.model_validate_jsonand thechunk → JSON-bytes serialization run on threads.
embed_document— chunk deserialize and embed-chunk serializeboth go through the helpers via
to_thread.save_to_rag— chunk deserialize andhashlib.md5moveoff-loop.
The dominant per-step blocker (
DoclingDocument.model_validate_jsonon a multi-MB Docling document) no longer pins the loop, so
heartbeats, lifecycle subscribers, and sibling consumer polls stay
responsive while a parse is in flight.
3.
docling.pyrewriteThe file was in a broken intermediate state when the PR opened: a
shadowed function name meant the public
docling_convertsymbolevery caller imports didn't exist, and the semaphore that was
supposed to gate docling-serve requests was dead code.
docling_convertwith@retryoutside andasync with get_docling_sem()inside. Failing attempts releasetheir slot during the exponential-backoff wait — fixes a
potential deadlock where a backed-up docling server could pin
every slot.
do_repl(recursive image-placeholder substitution) and the whole-tree
json.dumpsre-serialization no longer run while holding adocling-serve slot. They're split into a sync
_process_resulthelper that callers offload to
asyncio.to_thread.response.json()runs on a worker thread. TheDocling response body can be many MB; parsing it on the loop was
blocking sibling coroutines for seconds.
is_htmlbound to a fixed window. Previously the<bodycheck scanned the entire buffer — wasted up to 50 MB of
bytes.findon every non-HTML payload. Bounded to the first8 KB;
<htmlcheck stays at the original 100 bytes.Semaphore(12)insplit_parse_pdf_fileremoved.Process-wide gating now lives in one place (
docling_convert)with one tunable (
settings.docling_concurrency).4. Test fixture for docling semaphore
asyncio.Semaphorebinds to whichever event loop is current atconstruction time, which trips per-test loops with
RuntimeError: ... bound to a different loop. Added an autousefixture in
tests/conftest.pythat resetsdocling._docling_sem = Nonebefore and after each test.Tests
tests/unit/test_workflow_helpers.py— 21 tests covering allfour new sync helpers (roundtrip, empty input, duck-typed
model_dump, in-process PDF generation for_extract_pdf_metadata,known MD5 values, etc.).
tests/unit/test_docling.py— 23 tests coveringis_html(thenew 8 KB bound, the preserved 100-byte
<htmlwindow, theregression case where
<bodypast 8 KB is no longer detected),do_repl,get_docling_sem(lazy singleton, respects settings),_process_result(success/failure, placeholder mode, both outputformats).
tests/unit/test_runner_internals.py— 4 new tests for thein-process
asyncio.Lock(_run_stepdoesn't callacquire_resource_lock, sameresource_keyserializes,different
resource_keys run concurrently, no-key path skipsthe lock dict).
warning, sweeper-loop tests, race-release backoff, defensive
acquire path. Net test count: +44 − 10 + 21 + 23 = +78 (the
workflow_helpers and docling files are entirely new).
runner.pyis at 98%.Docs
ARCHITECTURE.md,DATABASE.md, andWORKFLOWS.mdupdated todescribe the new model: in-process serialization for workers, sentinel
expires_aton worker-heldResourceLockrows, removedclaim_lost_race/resource_lock_sweptmetrics, and the correctednarrative around CLI/web/lifecycle holders continuing to use real TTLs.
Backwards compatibility
ResourceLock.expires_atis the samecolumn, just holds a far-future sentinel for
WORKERrows. Theindex on
expires_atis still useful for CLI/web/lifecyclesweeps.
docling.docling_convertnow exists as the public symbol(it previously didn't, due to the rename bug, so this is strictly
a fix). All other public signatures unchanged.
docling_concurrencyis now actually honored as theprocess-wide gate. Previously the hardcoded
Semaphore(12)insplit_parse_pdf_fileoverrode it within a single PDF parse.Deployments that previously relied on the implicit 12 should
reconsider their
docling_concurrencyvalue.Operational notes
process per docling-serve. The cross-process correctness floor is
preserved for future scale-out, but the docling semaphore is
still per-process — multiple workers against a shared
docling-serve will see aggregate
N × docling_concurrencyparallelism at the server. Rate-limit at the proxy or in
docling-serve's queue if you scale out.
now bounded by
worker_checkin_timeout(default 600 s) ratherthan
resource_lock_ttl(was 300 s). Tuneworker_checkin_timeoutif 10 minutes is too long for your deployment.