Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,9 @@ claim-with-lease persistence layer:
alongside.
- **Pluggable metrics** — `Metrics` protocol with a default
`LoggingMetrics` no-op. Counters: `claim_attempts`,
`claim_success`, `claim_idle`, `claim_error`, `claim_lost_race`,
`step_completed`, `step_error`, `step_failed`, `step_released`,
`worker_reaped`, `lease_lost`, `resource_lock_swept`. Histograms:
`claim_duration`, `step_duration`.
`claim_success`, `claim_idle`, `claim_error`, `step_completed`,
`step_error`, `step_failed`, `step_released`, `worker_reaped`,
`lease_lost`. Histograms: `claim_duration`, `step_duration`.

Two workers can coexist in one process because nothing in the runner
is module-global anymore; legacy module-level `start_worker` /
Expand Down Expand Up @@ -168,17 +167,21 @@ graph LR
### Workflow Execution Flow

1. **Worker Startup** - `Worker.start()` spawns per-type consumer
loops plus heartbeat / reaper / lock-sweeper background tasks
loops plus heartbeat / reaper background tasks
2. **Step Claim** - Consumer calls `claim_next_step` with a fresh
lease token; atomic `UPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED`
marks the row RUNNING. Steps whose `resource_key` is currently
locked are skipped at the SQL layer
3. **Resource Lock Acquire** - If the step declared a
`resource_key`, the worker acquires the matching `ResourceLock`
row (TTL-refreshed on heartbeat)
marks the row RUNNING and (if the step has a `resource_key`)
inserts the matching `ResourceLock` row in the same transaction.
Steps whose `resource_key` is currently held are skipped at the
SQL layer.
3. **In-Process Serialization** - The worker takes a per-key
`asyncio.Lock` before invoking the user step handler, so two
consumer coroutines targeting the same `resource_key` serialize
without DB I/O and without TTL semantics
4. **Status Transition** - PENDING → RUNNING → COMPLETED / ERROR /
FAILED; terminal writes are gated on the lease so a stale
worker cannot double-finalize
worker cannot double-finalize. The `ResourceLock` row is
cleared in the same transaction as the terminal write.
5. **Step Execution** - Calls registered handler method
6. **Artifact Storage** - Saves intermediate results
7. **Retry Logic** - `error_step` increments retry; elevates to
Expand Down
27 changes: 15 additions & 12 deletions docs/DATABASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -599,23 +599,26 @@ here.
- `holder_kind` (ResourceLockKind) - `worker`, `cli`, `web`, or `lifecycle`
- `step_id` (int, nullable) - Set when held by a worker on behalf of a step
- `acquired_at` (datetime) - When the lock was acquired
- `expires_at` (datetime, indexed) - TTL boundary; refreshed by holder heartbeats
- `expires_at` (datetime, indexed) - TTL boundary for non-worker
holders. Worker holders use a far-future sentinel so the row is
cleared by step-status transitions, not by a TTL sweep.
- `holder_meta` (dict[str, str]) - JSON metadata

**Lifecycle:**

- Acquired via `operations.acquire_resource_lock(...)` — opportunistically
sweeps expired rows before attempting insert under the unique primary key
- TTL-refreshed via `refresh_resource_lock(...)` (workers refresh on
heartbeat at half the TTL)
- Released via `release_resource_lock(...)` (idempotent) or
- Worker holders: inserted atomically by `claim_next_step` in the
same transaction as the `RunStep` status update. Dropped by
`complete_step` / `error_step` / `release_step` in the same
transaction as the step terminal write, or by
`reap_dead_workers` keyed on lease token.
- CLI / web / lifecycle holders: acquired via
`operations.acquire_resource_lock(...)` with a real TTL, which
opportunistically sweeps expired rows before attempting insert
under the unique primary key. Released via
`release_resource_lock(...)` (idempotent) or
`force_release_resource_lock(...)` (audit-logged, used by
`si-diag vacuum --force`)
- Dropped automatically by `complete_step` / `error_step` /
`release_step` in the same transaction as the step terminal
write
- Expired rows are swept by `sweep_expired_resource_locks()` on a
60-second loop in each worker
`si-diag vacuum --force`). Expired rows are also swept on demand
by `sweep_expired_resource_locks()`.

**Example:**

Expand Down
9 changes: 5 additions & 4 deletions docs/WORKFLOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,13 @@ production:
| `claim_success` | A step was claimed |
| `claim_idle` | No work was available |
| `claim_error` | The claim query raised |
| `claim_lost_race` | Step's `resource_key` was held between claim and acquire|
| `step_completed` | Successful terminal write |
| `step_error` | Retryable failure |
| `step_failed` | Retries exhausted |
| `step_released` | Cooperative release on shutdown |
| `step_reset_by_reaper` | A dead worker's step was reset to PENDING |
| `worker_reaped` | A peer worker was reaped |
| `lease_lost` | Terminal write found a non-matching lease (labelled `phase`) |
| `resource_lock_swept` | Expired `ResourceLock` rows were swept |

| Histogram | Description |
|-------------------|--------------------------------------|
Expand Down Expand Up @@ -711,8 +709,11 @@ points at a holder that no longer exists.

**Solution:**
1. Inspect the lock: `SELECT * FROM resourcelock WHERE resource_key='rag:/path/to/db'`
2. The background `sweep_expired_resource_locks` loop clears
expired holders every 60s; if the row is genuinely stuck,
2. Worker-held lock rows are cleared when the step completes,
errors, is released, or the holder is reaped via
`WorkerCheckin`. CLI/web/lifecycle holders use a real TTL and
are cleared opportunistically by `acquire_resource_lock` or by
`sweep_expired_resource_locks`. If the row is genuinely stuck,
break it from the CLI:

```bash
Expand Down
1 change: 0 additions & 1 deletion src/soliplex/ingester/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class Settings(BaseSettings):
embeddings_store_dir: str = "embeddings"
stop_phrases: list[str] = []

ingest_queue_concurrency: int = 20
ingest_worker_concurrency: int = 10
docling_concurrency: int = 3
input_s3: S3Settings = S3Settings()
Expand Down
131 changes: 103 additions & 28 deletions src/soliplex/ingester/lib/docling.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import http.cookiejar as cj
import json
import logging
Expand Down Expand Up @@ -27,6 +28,20 @@
"max_tokens": 200,
}

# Process-wide async semaphore that bounds the number of concurrent
# in-flight requests to the docling-serve backend. Initialized lazily
# so it binds to the running event loop on first use; tests that
# spin up fresh loops should reset this back to None between cases
# (see ``tests/conftest.py``).
_docling_sem: asyncio.Semaphore | None = None


def get_docling_sem() -> asyncio.Semaphore:
global _docling_sem
if _docling_sem is None:
_docling_sem = asyncio.Semaphore(get_settings().docling_concurrency)
return _docling_sem


def do_repl(data):
if isinstance(data, dict):
Expand All @@ -39,7 +54,14 @@ def do_repl(data):


def is_html(file_bytes: bytes) -> bool:
return (file_bytes.startswith(b"<!DOCTYPE html>") or b"<html" in file_bytes[:100]) and b"<body" in file_bytes
"""Detect HTML from the leading bytes only.

The ``<html`` check stays at the first 100 bytes (matches the
original behavior). The ``<body`` check is bounded to the first
8 KB so non-HTML payloads (e.g. multi-MB PDFs) do not trigger a
full-buffer ``bytes.find``.
"""
return (file_bytes.startswith(b"<!DOCTYPE html>") or b"<html" in file_bytes[:100]) and b"<body" in file_bytes[:8192]


@retry(stop=stop_after_attempt(4), wait=wait_exponential_jitter(), reraise=True)
Expand All @@ -52,17 +74,53 @@ async def docling_convert(
) -> dict:
"""Convert a document via the docling-serve HTTP backend.

Concurrency for ``parse`` steps is now bounded at the worker
level by the ``parse`` consumer pool count
(``Worker(consumers={"parse": N, ...})``); the library does no
rate-limiting of its own. To preserve the previous default,
operators should set ``consumers["parse"]`` equal to
``settings.docling_concurrency``.
Process-wide concurrency to docling-serve is bounded by
``settings.docling_concurrency`` via the module-global
``_docling_sem``. The semaphore wraps only the HTTP / websocket
/ result-GET round trip — large-document post-processing
(recursive image-placeholder substitution and whole-tree
re-serialization) runs outside the slot and on a worker thread,
so CPU work cannot starve docling-serve capacity.

The semaphore is acquired *inside* the tenacity retry so a
failing attempt releases its slot while it waits to retry.

Note: the gate is per-process. If multiple worker processes
share one docling-serve, the server sees
``N_workers * docling_concurrency`` aggregate parallelism; in
that topology, rate-limit at the proxy or in docling-serve's
own queue instead.
"""
async with get_docling_sem():
res, parameters = await _docling_request(
file_bytes,
mime_type,
source_uri,
config_dict,
output_formats,
)
return await asyncio.to_thread(_process_result, res, parameters, source_uri, output_formats)


async def _docling_request(
file_bytes: bytes,
mime_type: str,
source_uri: str,
config_dict: dict[str, str | int | bool],
output_formats: list[str],
) -> tuple[dict, dict]:
"""POST + websocket wait + result GET against docling-serve.

Returns ``(res, parameters)``. ``res`` is the parsed result
document; ``parameters`` is the request body as sent (the
caller needs ``image_export_mode`` from it to decide on
placeholder replacement). Raises ``ValueError`` on protocol
errors, which the outer ``@retry`` retries.
"""
env = get_settings()
local_jar = cj.CookieJar()
async_url = f"{env.docling_server_url}/convert/file/async"
parameters = {
parameters: dict = {
"from_formats": [
"docx",
"pptx",
Expand Down Expand Up @@ -102,7 +160,6 @@ async def docling_convert(
parameters["do_picture_description"] = False

file_name = source_uri.split("/")[-1]

if mime_type and "markdown" in mime_type and not file_name.endswith(".md"):
file_name = file_name + ".md"
# docling requires some special handling for html
Expand All @@ -112,9 +169,7 @@ async def docling_convert(

f = BytesIO(file_bytes)
try:
files = {
"files": (file_name, f, mime_type),
}
files = {"files": (file_name, f, mime_type)}
logger.debug(f"using {parameters} on {file_name}")
async with httpx.AsyncClient(timeout=env.docling_http_timeout, cookies=local_jar) as _async_client:
response = await _async_client.post(async_url, files=files, data=parameters)
Expand Down Expand Up @@ -143,28 +198,48 @@ async def docling_convert(
logger.error(f"no errors in response: {payload}")
result_url = f"{env.docling_server_url}/result/{task_id}"
response = await _async_client.get(result_url)
res = response.json()
# The result body carries the full Docling document, which
# can be many MB. ``response.json()`` runs stdlib
# ``json.loads`` on the calling thread — offload so the
# event loop stays responsive while it parses.
res = await asyncio.to_thread(response.json)
logger.info(f"{task_id} result={res.get('status')} processing time={res.get('processing_time')}")
finally:
f.close()
return res, parameters


def _process_result(
res: dict,
parameters: dict,
source_uri: str,
output_formats: list[str],
) -> dict:
"""Validate the docling-serve result and re-serialize each
requested output format.

Pure Python. ``do_repl`` walks the entire JSON tree and
``json.dumps`` re-serializes it — seconds of work on a large
document. Designed to run on a worker thread (see the
``asyncio.to_thread`` call in :func:`docling_convert`).
"""
if "status" not in res:
raise ValueError(f"no status in response: {res}")
logger.info(f"{task_id} result={res['status']} processing time={res['processing_time']}")

if res["status"] == "success":
parsed = {}
for output_format in output_formats:
output_content = res["document"][f"{output_format}_content"]
if output_format == "json":
if "image_export_mode" in parameters and parameters["image_export_mode"] == "placeholder":
logger.info(f" doing placeholder replacement for {source_uri}")
output_content = do_repl(output_content)
parsed[output_format] = json.dumps(output_content).encode("utf-8")
else:
parsed[output_format] = str(output_content).encode("utf-8")
return parsed
else:
if res["status"] != "success":
raise ValueError(str(res["errors"]))

parsed: dict[str, bytes] = {}
for output_format in output_formats:
output_content = res["document"][f"{output_format}_content"]
if output_format == "json":
if parameters.get("image_export_mode") == "placeholder":
logger.info(f" doing placeholder replacement for {source_uri}")
output_content = do_repl(output_content)
parsed[output_format] = json.dumps(output_content).encode("utf-8")
else:
parsed[output_format] = str(output_content).encode("utf-8")
return parsed


def get_docling_schema_version() -> str:
import docling_core.types.doc.document as dd
Expand Down
2 changes: 1 addition & 1 deletion src/soliplex/ingester/lib/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ async def save_to_rag(
logger.info(f"Found existing document {found[0].id}", extra=_log_con)
doc_id = found[0].id
await client.delete_document(doc_id)
logger.debug(f"deleted existing document {found[0].id}", extra=_log_con)
logger.info(f"deleted existing document {found[0].id}", extra=_log_con)

new_doc = await client.import_document(
chunks=chunks,
Expand Down
13 changes: 10 additions & 3 deletions src/soliplex/ingester/lib/wf/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1770,13 +1770,21 @@ def _utc_now() -> datetime.datetime:
RunStatus.ERROR,
)

# Sentinel expires_at for WORKER-held ``ResourceLock`` rows. Worker
# locks have no functional TTL — they're cleared by complete_step,
# error_step, release_step, or reap_dead_workers — but the row must
# still satisfy ``subq_locked``'s ``expires_at > now`` predicate and
# stay clear of the opportunistic ``WHERE expires_at < now`` sweep.
# CLI/web/lifecycle holders going through ``acquire_resource_lock``
# continue to use real TTLs.
_WORKER_LOCK_EXPIRES = datetime.datetime(9999, 12, 31)


async def claim_next_step(
worker_id: str,
lease_token: str,
allowed_types: list[WorkflowStepType] | None = None,
batch_id: int | None = None,
resource_lock_ttl: int = 300,
holder_meta: dict[str, str] | None = None,
) -> RunStep | None:
"""Atomically claim the next eligible step for *worker_id*.
Expand Down Expand Up @@ -1821,7 +1829,6 @@ async def claim_next_step(
atomic resource-lock insert lost a race.
"""
now = _utc_now()
expires = now + datetime.timedelta(seconds=resource_lock_ttl)
async with get_session() as session:
# Subquery 1: minimum step number per eligible workflow run.
subq_min_step = (
Expand Down Expand Up @@ -1927,7 +1934,7 @@ async def claim_next_step(
holder_kind=ResourceLockKind.WORKER,
step_id=step.id,
acquired_at=now,
expires_at=expires,
expires_at=_WORKER_LOCK_EXPIRES,
holder_meta=holder_meta or {"worker_id": worker_id},
),
)
Expand Down
Loading
Loading