Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ repos:
- id: pip-audit
name: pip-audit

#ignore due to cooldown
entry: uv audit --no-dev --frozen
language: system
pass_filenames: false
Expand Down
9 changes: 6 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ src/soliplex/ingester/
├── rag.py # HaikuRAG integration
├── processing.py # Document processing utilities
└── wf/ # Workflow execution engine
├── runner.py # Async worker
├── operations.py
├── runner.py # `Worker` class, lifecycle bus, metrics
├── operations.py # Persistence: atomic claim, lease tokens,
│ # ResourceLock, group-completion
└── registry.py # Workflow/param loading from YAML

config/
Expand Down Expand Up @@ -132,8 +133,10 @@ DOC_DB_URL="postgresql+psycopg://user:pass@host:5432/soliplex"
| Document | Unique documents by SHA256 hash |
| DocumentURI | Maps URIs to documents |
| WorkflowRun | Single workflow execution |
| RunStep | Individual step in workflow |
| RunStep | Individual step; carries `lease_token` and `resource_key` for the claim layer |
| RunGroup | Groups workflow runs for a batch |
| WorkerCheckin | Worker heartbeat (driven by `Worker.start/stop`) |
| ResourceLock | Cross-subsystem rendezvous for RAG-DB writers |
| SyncState | Incremental sync tracking |

## API Routes
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ RUN apt-get update && \
apt-get install -y --no-install-recommends git && \
rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:0.11.3 /uv /uvx /bin/
COPY --from=ghcr.io/astral-sh/uv:0.11.14 /uv /uvx /bin/

RUN groupadd -g ${APP_GID} appuser && \
useradd -u ${APP_UID} -g ${APP_GID} -m -s /bin/bash appuser
Expand Down
44 changes: 38 additions & 6 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -1030,14 +1030,36 @@ Optimize and clean up database tables to reduce disk usage.
**Response:**

- `200 OK` - Vacuum completed successfully
- `500 Internal Server Error` - Vacuum failed
- `404 Not Found` - Database does not exist at the resolved path
- `409 Conflict` - The DB's `ResourceLock` is held by another
writer (workflow worker, CLI vacuum, lifecycle vacuum). The
endpoint uses `max_wait=0` for fail-fast behaviour.
- `500 Internal Server Error` - Vacuum failed for any other reason

**Response Body:**
**Response Bodies:**

`200 OK`:

```json
{
"status": "ok"
}
{ "status": "ok" }
```

`404 Not Found`:

```json
{ "status": "not_found", "error": "Database does not exist at ..." }
```

`409 Conflict`:

```json
{ "status": "locked", "error": "RAG DB locked by worker:<lease> since ..." }
```

`500 Internal Server Error`:

```json
{ "status": "error", "error": "Failed to vacuum database: ..." }
```

**Example:**
Expand All @@ -1046,7 +1068,17 @@ Optimize and clean up database tables to reduce disk usage.
curl "http://localhost:8000/api/v1/lancedb/vacuum?db=default"
```

**Note:** Vacuum removes deleted rows and compacts table files. Run periodically after bulk deletions.
**Notes:**

- Vacuum holds the cross-subsystem `ResourceLock` (holder_kind=`web`)
for the duration of the operation, so it cannot race workflow
`save_to_rag` steps, CLI vacuums, or lifecycle vacuums.
- Returns 409 immediately if the lock cannot be acquired — retry
later, or break the lock from the CLI with
`si-diag lancedb vacuum <db> --force`.
- Run periodically after bulk deletions, or wire the
`batch_split_vacuum` workflow to vacuum at the end of every
run group.

---

Expand Down
93 changes: 77 additions & 16 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,58 @@ Implementation: `src/soliplex/ingester/lib/wf/`

### 3. Worker System

Async workers process workflow steps concurrently:

- Workers poll for pending workflow steps
- Configurable concurrency levels for different operations
- Automatic retry logic with configurable retry counts
- Health check/heartbeat system via `WorkerCheckin`
Workflow execution is owned by a class-based orchestrator (`Worker`)
that runs a set of typed consumer pools backed by an atomic
claim-with-lease persistence layer:

- **Typed consumer pools** — `Worker(WorkerConfig(consumers={"parse": 4,
"store": 8, "*": 2}))`. Each pool calls `operations.claim_next_step`
with an `allowed_types` filter, so per-step-type concurrency is
bounded at the database level instead of an in-process semaphore.
The `"*"` pool is a catch-all for any step type not explicitly
pinned.
- **Atomic claim + lease tokens** — every claim is an
`UPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED` that stamps the row
with a UUID lease. `complete_step` / `error_step` / `release_step`
are gated on a matching lease, so a worker reaped between claim
and write can never bounce a fresh claimant.
- **`ResourceLock` rendezvous** — STORE-type steps stamp a
`resource_key` (typically `rag:<abs-db-path>`) at run-creation time.
The claim layer skips a step whose key is currently held, and the
worker acquires the lock for the duration of execution. The same
table is the rendezvous point for the web vacuum endpoint, the
`si-diag` CLI, and `end_group` lifecycle vacuums.
- **Graceful shutdown** — `Worker.stop(timeout)` signals consumers to
stop claiming, waits for in-flight work, then cancels remaining
consumers. Cancelled consumers call `release_step` so the row
returns to PENDING immediately rather than waiting for the
worker-checkin timeout. The worker also deletes its own checkin
row on shutdown.
- **Event-driven lifecycle bus** — `LifecycleBus` fires hooks on
fire-and-forget tasks; a slow `STEP_START` handler can never
block step execution. `GROUP_END` is emitted by a coordinator
that subscribes to step-end events and consults
`operations.try_complete_run_group(group_id)` for exactly-once
semantics across N workers.
- **Self-skipping reaper** — `reap_dead_workers(my_id, threshold)`
always excludes the caller, eliminating the self-reaping race
where a stalled checkin loop would cause a worker to reset its
own in-flight steps. Reaped workers' resource locks are cleared
alongside.
- **Pluggable metrics** — `Metrics` protocol with a default
`LoggingMetrics` no-op. Counters: `claim_attempts`,
`claim_success`, `claim_idle`, `claim_error`, `claim_lost_race`,
`step_completed`, `step_error`, `step_failed`, `step_released`,
`worker_reaped`, `lease_lost`, `resource_lock_swept`. Histograms:
`claim_duration`, `step_duration`.

Two workers can coexist in one process because nothing in the runner
is module-global anymore; legacy module-level `start_worker` /
`stop_worker` / `get_worker_id` shims are preserved for the few
existing callers but new code should construct a `Worker` directly.

Worker implementation: `src/soliplex/ingester/lib/wf/runner.py`
Persistence seam: `src/soliplex/ingester/lib/wf/operations.py`

### 4. Storage Layer

Expand Down Expand Up @@ -123,14 +167,28 @@ graph LR

### Workflow Execution Flow

1. **Worker Startup** - Worker registers and starts polling
2. **Step Selection** - Worker queries for PENDING steps with `FOR UPDATE` lock
3. **Status Transition** - PENDING → RUNNING → COMPLETED/ERROR/FAILED
4. **Step Execution** - Calls registered handler method
5. **Artifact Storage** - Saves intermediate results
6. **Retry Logic** - Automatic retry on ERROR status
7. **Run Completion** - Aggregates step status to run status
8. **Group Completion** - Aggregates run status to group status
1. **Worker Startup** - `Worker.start()` spawns per-type consumer
loops plus heartbeat / reaper / lock-sweeper background tasks
2. **Step Claim** - Consumer calls `claim_next_step` with a fresh
lease token; atomic `UPDATE-FROM-SELECT-FOR-UPDATE-SKIP-LOCKED`
marks the row RUNNING. Steps whose `resource_key` is currently
locked are skipped at the SQL layer
3. **Resource Lock Acquire** - If the step declared a
`resource_key`, the worker acquires the matching `ResourceLock`
row (TTL-refreshed on heartbeat)
4. **Status Transition** - PENDING → RUNNING → COMPLETED / ERROR /
FAILED; terminal writes are gated on the lease so a stale
worker cannot double-finalize
5. **Step Execution** - Calls registered handler method
6. **Artifact Storage** - Saves intermediate results
7. **Retry Logic** - `error_step` increments retry; elevates to
FAILED when `retry >= retries` and cascades pending siblings to
CANCELLED
8. **Run Completion** - `recompute_run_status` derives the run
status from current step counts (idempotent)
9. **Group Completion** - `try_complete_run_group` is an atomic
conditional update; the worker whose update affected a row fires
`GROUP_END` exactly once

## Configuration

Expand Down Expand Up @@ -200,9 +258,12 @@ Add event handlers in workflow configuration to respond to:
**Database Tables:**

- `workflowrun` - Track run status and duration
- `runstep` - Monitor individual step execution
- `workcheckin` - Worker health and activity
- `runstep` - Monitor individual step execution; carries
`lease_token` and `resource_key` columns used by the claim layer
- `workercheckin` - Worker health and activity
- `lifecyclehistory` - Audit trail of events
- `resourcelock` - Cross-subsystem lock rendezvous for RAG-DB
writers (workflow, web vacuum, CLI vacuum, lifecycle vacuum)

**Metrics Available:**

Expand Down
83 changes: 83 additions & 0 deletions docs/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,13 @@ si-cli vacuum DB_NAME [OPTIONS]
- Automatically runs pending migrations before vacuuming if required
- Sets `vacuum_retention_seconds` to 0 to ensure all deleted data is reclaimed
- Will not create a new database — errors if the path does not exist
- Holds the cross-subsystem `ResourceLock` (holder_kind=`cli`)
for the duration of the operation. Workflow `save_to_rag` steps
for the same DB are blocked from claim until the lock is
released, so the vacuum cannot race a writer.
- Default behaviour waits forever for the lock; if you need fail-fast
semantics, prefer `si-diag lancedb vacuum` which uses `max_wait=0`
and offers `--force`.

**Examples:**

Expand Down Expand Up @@ -928,6 +935,7 @@ The `si-diag` CLI provides read-only access to system state for debugging and mo
| `run-group` | List and inspect run groups |
| `workflow` | List and inspect workflow runs and steps |
| `status` | View running steps, recent activity, and aggregated details |
| `lancedb` | Vacuum, vacuum-all, and verify HMAC of LanceDB databases (with `--force` to break a stuck lock) |

---

Expand Down Expand Up @@ -1181,6 +1189,81 @@ si-diag status details 1

---

### lancedb vacuum

Vacuum a single LanceDB database. Fails fast (exit code 2) if the
cross-subsystem `ResourceLock` is held by another writer.

```bash
si-diag lancedb vacuum my_database
si-diag lancedb vacuum my_database --sign
si-diag lancedb vacuum my_database --force
```

**Arguments:**

- `DB_NAME` (str, required) - Name of the database directory under `LANCEDB_DIR`

**Options:**

| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `--sign` | bool | False | Write an HMAC-SHA512 signature after vacuuming (requires `LANCEDB_HMAC_KEY`) |
| `--force` | bool | False | Audit-log + drop the `ResourceLock` row before retrying (use when a holder has crashed) |

**Exit Codes:**

- `0` - Vacuum completed
- `1` - Database not found (used with `--force` resolution)
- `2` - Lock held by another writer; pass `--force` to break

**Notes:**

- Acquires the `ResourceLock` with `holder_kind=cli`, `max_wait=0`
- `--force` calls `force_release_resource_lock`, which emits a
warning log line containing the previous holder for audit

---

### lancedb vacuum-all

Vacuum every database under `LANCEDB_DIR`. DBs whose lock is held
by another writer are skipped with a printed message; processing
continues with the remaining databases.

```bash
si-diag lancedb vacuum-all
si-diag lancedb vacuum-all --sign
```

**Options:**

| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `--sign` | bool | False | HMAC-sign each database after vacuuming |

---

### lancedb verify

Verify the HMAC-SHA512 signature of a LanceDB database against
its `.hmac` sidecar. Requires `LANCEDB_HMAC_KEY`.

```bash
si-diag lancedb verify my_database
```

**Arguments:**

- `DB_NAME` (str, required) - Name of the database directory under `LANCEDB_DIR`

**Exit Codes:**

- `0` - HMAC verification passed
- `1` - Verification failed or sidecar missing

---

## Future Commands

Commands planned for future releases:
Expand Down
Loading
Loading