Skip to content

Conversation

@tempusfrangit
Copy link
Member

@tempusfrangit tempusfrangit commented Jan 16, 2026

This is a complete rewrite of the cog http server moving away from python and to a rust pyo3, ABI3 wheel. The aim is to provide a significant uplift in control of how we run predictors and isolate the predictor run from the core interface used to run the predictor.

═══════════════════════════════════════════════════════════════════════════════════
                           COMPONENT OWNERSHIP
═══════════════════════════════════════════════════════════════════════════════════
  PredictionSupervisor (DashMap - lock-free concurrent access)
  ├── owns: prediction state (id, status, input, output, logs, error, timestamps)
  ├── owns: webhook sender (sends terminal webhook, then cleans up entry)
  ├── owns: cancel_token (propagates cancellation to worker)
  └── owns: completion notifier (for waiting on result)
  PredictionSlot (RAII container)
  ├── owns: Prediction (logs, outputs from worker event loop)
  ├── owns: Permit (concurrency token, returns to pool on drop)
  └── Drop: marks permit idle, releases back to pool
  PredictionHandle (returned to route handler)
  ├── has: reference to supervisor (for state queries)
  ├── has: cancel_token clone (for cancellation)
  ├── has: completion notifier clone (for waiting)
  └── method: sync_guard() → SyncPredictionGuard (cancels on drop)
═══════════════════════════════════════════════════════════════════════════════════
                         WORKER SUBPROCESS PROTOCOL
═══════════════════════════════════════════════════════════════════════════════════
  Control Channel (stdin/stdout - JSON framed)
  ┌─────────────────────────────────────────────────────────────────────────────┐
  │  Parent → Child                    Child → Parent                           │
  │  ──────────────                    ──────────────                           │
  │  Init { predictor_ref, slots }     Ready { schema }                         │
  │  Cancel                            Cancelled                                │
  │  Shutdown                          Failed { error }                         │
  │                                    Idle                                     │
  └─────────────────────────────────────────────────────────────────────────────┘
  Slot Channel (Unix socket per slot - JSON framed)
  ┌─────────────────────────────────────────────────────────────────────────────┐
  │  Parent → Child                    Child → Parent                           │
  │  ──────────────                    ──────────────                           │
  │  Predict { id, input }             Log { data }                             │
  │                                    Output { value }  (streaming)            │
  │                                    Done { output }                          │
  │                                    Failed { error }                         │
  │                                    Cancelled                                │
  └─────────────────────────────────────────────────────────────────────────────┘
═══════════════════════════════════════════════════════════════════════════════════
                            HEALTH STATE MACHINE
═══════════════════════════════════════════════════════════════════════════════════
                    ┌──────────────┐
                    │   STARTING   │  (HTTP serves 503)
                    └──────┬───────┘
                           │
              ┌────────────┴────────────┐
              ▼                         ▼
     ┌────────────────┐        ┌───────────────┐
     │     READY      │        │ SETUP_FAILED  │  (setup() threw)
     └────────┬───────┘        └───────────────┘
              │
              │ (all slots busy)
              ▼
     ┌────────────────┐
     │     BUSY       │  (409 for new predictions)
     └────────┬───────┘
              │
              │ (slot freed)
              ▼
     ┌────────────────┐
     │     READY      │
     └────────┬───────┘
              │
              │ (fatal error / worker crash)
              ▼
     ┌────────────────┐
     │    DEFUNCT     │  (HTTP serves 503)
     └────────────────┘
═══════════════════════════════════════════════════════════════════════════════════
                              FILE STRUCTURE
═══════════════════════════════════════════════════════════════════════════════════
  crates/coglet/src/
  ├── lib.rs                    # Public API exports
  ├── service.rs                # PredictionService (orchestrates everything)
  ├── supervisor.rs             # PredictionSupervisor (lifecycle + webhooks)
  ├── prediction.rs             # Prediction state (logs, outputs, status)
  ├── health.rs                 # Health enum + SetupResult
  ├── orchestrator.rs           # Worker subprocess management
  ├── permit/
  │   ├── mod.rs
  │   ├── pool.rs               # PermitPool (concurrency control)
  │   └── slot.rs               # PredictionSlot (Prediction + Permit RAII)
  ├── bridge/
  │   ├── mod.rs
  │   ├── protocol.rs           # Control/Slot request/response types
  │   ├── codec.rs              # JSON length-delimited framing
  │   └── transport.rs          # Unix socket transport
  ├── transport/
  │   └── http/
  │       ├── mod.rs
  │       ├── server.rs         # Axum server setup
  │       └── routes.rs         # HTTP handlers (uses supervisor)
  ├── webhook.rs                # WebhookSender (retry logic, trace context)
  ├── worker/
  │   ├── mod.rs
  │   ├── manager.rs            # Worker spawn/lifecycle
  │   └── worker.rs             # Worker main loop (child process side)
  └── version.rs                # VersionInfo
  crates/coglet-python/src/
  └── lib.rs                    # PyO3 bindings (coglet.serve())
┌─────────────────────────────────────────────────────────────────────────────────┐
│                              HTTP Transport (axum)                               │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ │
│  │ POST        │  │ PUT         │  │ POST        │  │ GET                     │ │
│  │ /predictions│  │ /predictions│  │ /cancel     │  │ /health-check           │ │
│  │             │  │ /{id}       │  │             │  │ /openapi.json           │ │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └───────────┬─────────────┘ │
└─────────┼────────────────┼────────────────┼─────────────────────┼───────────────┘
        │                │                │                     │
        ▼                ▼                ▼                     ▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│                            PredictionService                                     │
│  ┌────────────────────────────────────────────────────────────────────────────┐ │
│  │                        PredictionSupervisor (DashMap)                      │ │
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐            │ │
│  │  │ PredictionEntry │  │ PredictionEntry │  │ PredictionEntry │  ...       │ │
│  │  │ ─────────────── │  │ ─────────────── │  │ ─────────────── │            │ │
│  │  │ state           │  │ state           │  │ state           │            │ │
│  │  │ cancel_token    │  │ cancel_token    │  │ cancel_token    │            │ │
│  │  │ webhook         │  │ webhook         │  │ webhook         │            │ │
│  │  │ completion      │  │ completion      │  │ completion      │            │ │
│  │  └─────────────────┘  └─────────────────┘  └─────────────────┘            │ │
│  └────────────────────────────────────────────────────────────────────────────┘ │
│                                                                                  │
│  ┌────────────────────────────────────────────────────────────────────────────┐ │
│  │                           PermitPool                                       │ │
│  │  ┌────────┐  ┌────────┐  ┌────────┐                                       │ │
│  │  │ Permit │  │ Permit │  │ Permit │  (concurrency control)                │ │
│  │  │ slot_0 │  │ slot_1 │  │ slot_2 │                                       │ │
│  │  └────────┘  └────────┘  └────────┘                                       │ │
│  └────────────────────────────────────────────────────────────────────────────┘ │
│                                                                                  │
│  ┌────────────────────────────────────────────────────────────────────────────┐ │
│  │                        OrchestratorHandle                                  │ │
│  │  (slot_ids, control_tx for worker comms)                                   │ │
│  └────────────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────┬──────────────────────────────────────────────┘
                                 │
                  Unix Socket (slot) + stdin/stdout (control)
                                 │
                                 ▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│                         Worker Subprocess (Python)                               │
│  ┌────────────────────────────────────────────────────────────────────────────┐ │
│  │                              Predictor                                     │ │
│  │  ┌─────────────────────────────────────────────────────────────────────┐  │ │
│  │  │  setup()    →  runs once at startup                                 │  │ │
│  │  │  predict()  →  handles SlotRequest::Predict                         │  │ │
│  │  └─────────────────────────────────────────────────────────────────────┘  │ │
│  └────────────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘
═══════════════════════════════════════════════════════════════════════════════════
                            PREDICTION FLOW
═══════════════════════════════════════════════════════════════════════════════════
Sync Request (POST /predictions)
─────────────────────────────────
  Client                    Routes                Supervisor           Worker
     │                        │                       │                   │
     │──POST /predictions────▶│                       │                   │
     │                        │──submit(id,input)────▶│                   │
     │                        │◀──PredictionHandle────│                   │
     │                        │                       │                   │
     │                        │──acquire permit──────▶│                   │
     │                        │                       │                   │
     │     (SyncPredictionGuard held - cancels on connection drop)       │
     │                        │                       │                   │
     │                        │──predict(slot,input)─────────────────────▶│
     │                        │                       │                   │
     │                        │◀─────────────result──────────────────────│
     │                        │                       │                   │
     │                        │──update_status(terminal)─▶│              │
     │                        │                       │──send webhook───▶│
     │                        │                       │──cleanup entry───│
     │                        │                       │                   │
     │◀──200 {output}─────────│                       │                   │
Async Request (Prefer: respond-async)
─────────────────────────────────────
  Client                    Routes                Supervisor           Worker
     │                        │                       │                   │
     │──POST + respond-async─▶│                       │                   │
     │                        │──submit(id,input)────▶│                   │
     │                        │◀──PredictionHandle────│                   │
     │                        │                       │                   │
     │◀──202 {starting}───────│                       │                   │
     │                        │                       │                   │
     │   (no guard - prediction continues independently)                 │
     │                        │                       │                   │
     │                        │    ┌──────────────────────────────────┐  │
     │                        │    │  spawned task:                   │  │
     │                        │    │    predict(slot, input)          │──▶│
     │                        │    │    update_status(result)         │  │
     │                        │    │    → triggers webhook + cleanup  │  │
     │                        │    └──────────────────────────────────┘  │
     │                        │                       │                   │
     │◀─────────────────────webhook (completed)───────│                   │
Idempotent PUT (PUT /predictions/{id})
──────────────────────────────────────
  Client                    Routes                Supervisor
     │                        │                       │
     │──PUT /predictions/X───▶│                       │
     │                        │──get_state("X")──────▶│
     │                        │                       │
     │         ┌──────────────┴──────────────┐       │
     │         │ if exists:                  │       │
     │◀────────│   return 202 + full state   │◀──────│
     │         │ else:                       │       │
     │         │   submit + run prediction   │       │
     │         └─────────────────────────────┘       │
Connection Drop (Sync Mode)
───────────────────────────
  Client                    Routes                Supervisor           Worker
     │                        │                       │                   │
     │──POST /predictions────▶│                       │                   │
     │                        │   (SyncPredictionGuard armed)            │
     │                        │──predict(slot)───────────────────────────▶│
     │                        │                       │                   │
     │ ✕ (connection drops)   │                       │                   │
     │                        │                       │                   │
     │                   guard.drop()                 │                   │
     │                        │──cancel_token.cancel()▶│                  │
     │                        │                       │──Cancel──────────▶│
     │                        │                       │◀──Cancelled───────│
     │                        │                       │                   │
═══════════════════════════════════════════════════════════════════════════════════
                         COG HTTP → COGLET INVOCATION
═══════════════════════════════════════════════════════════════════════════════════
  ┌─────────────────────────────────────────────────────────────────────────────┐
  │                        cog predict / cog run                                │
  │                               (CLI)                                         │
  └─────────────────────────────────┬───────────────────────────────────────────┘
                                    │
                                    ▼
  ┌─────────────────────────────────────────────────────────────────────────────┐
  │                     python -m cog.server.http                               │
  │                                                                             │
  │   if USE_COGLET env var:                                                    │
  │       import coglet                                                         │
  │       coglet.serve(predictor_ref, port=5000)  ──────────────────────────┐   │
  │   else:                                                                 │   │
  │       # original Python FastAPI server                                  │   │
  │       uvicorn.run(app, port=5000)                                       │   │
  └─────────────────────────────────────────────────────────────────────────┼───┘
                                                                            │
                                                                            ▼
  ┌─────────────────────────────────────────────────────────────────────────────┐
  │                          coglet (Rust)                                      │
  │                                                                             │
  │   ┌───────────────────────────────────────────────────────────────────┐     │
  │   │  HTTP Server (axum)  :5000                                        │     │
  │   │    /predictions, /health-check, etc.                              │     │
  │   └───────────────────────────────────────────────────────────────────┘     │
  │                              │                                              │
  │                              ▼                                              │
  │   ┌───────────────────────────────────────────────────────────────────┐     │
  │   │  PredictionService + Supervisor                                   │     │
  │   └───────────────────────────────────────────────────────────────────┘     │
  │                              │                                              │
  │                    Unix socket + pipes                                      │
  │                              │                                              │
  │                              ▼                                              │
  │   ┌───────────────────────────────────────────────────────────────────┐     │
  │   │  Worker subprocess (Python)                                       │     │
  │   │    - loads predictor_ref                                          │     │
  │   │    - runs setup()                                                 │     │
  │   │    - handles predict() requests                                   │     │
  │   └───────────────────────────────────────────────────────────────────┘     │
  └─────────────────────────────────────────────────────────────────────────────┘
  Why?
  ────
  • Rust HTTP server (axum) is faster, handles backpressure better
  • Worker isolation: Python crash doesn't kill server
  • Same API surface as original cog (drop-in replacement)
  • Subprocess reuse: predictor stays loaded between requests

@michaeldwan
Copy link
Member

in an effort to keep this as small as it can be (lololol) can we leave the coglet dir in the repo for now. Or if necessary, remove the go code and leave the python code? Would be nice to merge this as an addition only and then start wiring it up to the build/wheel selection system. I'd also like to salvage the python sdk portion of coglet since the complex type system and schema worked well for pipelines.

@tempusfrangit
Copy link
Member Author

in an effort to keep this as small as it can be (lololol) can we leave the coglet dir in the repo for now. Or if necessary, remove the go code and leave the python code? Would be nice to merge this as an addition only and then start wiring it up to the build/wheel selection system. I'd also like to salvage the python sdk portion of coglet since the complex type system and schema worked well for pipelines.

The python section of the code is the problem as it conflicts with the new wheel. I'll get some rename I. Place instead of removal.

Copy link
Contributor

@markphelps markphelps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most of my comments are enum cleanup related because I just read that chapter in the Book of Rust

@mfainberg-cf
Copy link

Yes the enum change was lost in translation. A number of other places I tried to ensure we had enums. I'll get an addon patch to make those conversions you highlighted unless you're interested in doing it.

markphelps added a commit that referenced this pull request Jan 20, 2026
Replace boolean soup with strongly-typed enums across four areas:

- PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags
  in predictor.rs (is_async, is_async_gen, has_train, is_train_async,
  is_standalone_function)

- PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool
  and error string matching in worker.rs

- HandlerMode enum (Predict/Train) replacing is_train boolean in
  worker_bridge.rs

- SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields
  with state machine in worker_bridge.rs

Benefits:
- Invalid states impossible at compile time
- Eliminates string matching for cancellation
- Clearer semantics and self-documenting code
- Easier to extend with new variants

Addresses PR #2641 review feedback.
markphelps added a commit that referenced this pull request Jan 20, 2026
* refactor: replace boolean flags with Rust enums

Replace boolean soup with strongly-typed enums across four areas:

- PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags
  in predictor.rs (is_async, is_async_gen, has_train, is_train_async,
  is_standalone_function)

- PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool
  and error string matching in worker.rs

- HandlerMode enum (Predict/Train) replacing is_train boolean in
  worker_bridge.rs

- SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields
  with state machine in worker_bridge.rs

Benefits:
- Invalid states impossible at compile time
- Eliminates string matching for cancellation
- Clearer semantics and self-documenting code
- Easier to extend with new variants

Addresses PR #2641 review feedback.

* fix: apply clippy suggestions

Use derive(Default) with #[default] attribute instead of manual impl.

* style: apply rustfmt formatting

* test: update tests to use PredictionOutcome enum

Replace direct field access (.success, .error) with pattern matching
on the PredictionOutcome enum.

* chore: rm deadcode, simplify start_sync_prediction logic

Signed-off-by: Mark Phelps <mphelps@cloudflare.com>

---------

Signed-off-by: Mark Phelps <mphelps@cloudflare.com>
@tempusfrangit tempusfrangit marked this pull request as ready for review January 21, 2026 05:41
@tempusfrangit tempusfrangit requested a review from a team as a code owner January 21, 2026 05:41
tempusfrangit pushed a commit that referenced this pull request Jan 21, 2026
* refactor: replace boolean flags with Rust enums

Replace boolean soup with strongly-typed enums across four areas:

- PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags
  in predictor.rs (is_async, is_async_gen, has_train, is_train_async,
  is_standalone_function)

- PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool
  and error string matching in worker.rs

- HandlerMode enum (Predict/Train) replacing is_train boolean in
  worker_bridge.rs

- SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields
  with state machine in worker_bridge.rs

Benefits:
- Invalid states impossible at compile time
- Eliminates string matching for cancellation
- Clearer semantics and self-documenting code
- Easier to extend with new variants

Addresses PR #2641 review feedback.

* fix: apply clippy suggestions

Use derive(Default) with #[default] attribute instead of manual impl.

* style: apply rustfmt formatting

* test: update tests to use PredictionOutcome enum

Replace direct field access (.success, .error) with pattern matching
on the PredictionOutcome enum.

* chore: rm deadcode, simplify start_sync_prediction logic

Signed-off-by: Mark Phelps <mphelps@cloudflare.com>

---------

Signed-off-by: Mark Phelps <mphelps@cloudflare.com>
Tasks for coglet development:
- build:rust / build:rust:release - Build Rust crates
- build:coglet - Build Python wheel with maturin (dev)
- build:coglet:wheel* - Build release wheels for various platforms
- test:rust - Run Rust tests with nextest
- test:coglet-python - Run Python integration tests
- fmt:rust / fmt:rust:check - Format Rust code
- clippy - Run lints
- deny - Check licenses/advisories

Tools added: cargo-binstall, cargo-deny, cargo-insta, cargo-nextest,
maturin, ruff, ty
Workspace structure:
- crates/Cargo.toml - workspace root with shared dependencies
- crates/coglet/ - main Rust library (empty scaffold)
- crates/deny.toml - cargo-deny license/advisory config
- crates/.gitignore - ignore build artifacts

Shared workspace dependencies defined for async runtime (tokio),
HTTP (axum, reqwest), serialization (serde), and error handling.
Runtime: tokio, tokio-util (codec), futures, async-trait
Serialization: serde, serde_json
HTTP: axum (server), reqwest + ureq (webhooks)
Utils: uuid, chrono, thiserror, anyhow, dashmap, tracing
Unix: nix (signal handling)
Dev: insta, wiremock, tower, http-body-util
Health enum: Unknown, Starting, Ready, Busy, SetupFailed, Defunct
SetupStatus enum with SetupResult for tracking setup phase
VersionInfo for runtime version reporting

Includes serde serialization with appropriate casing and tests.
Protocol types for parent-worker communication:
- ControlRequest/Response for control channel (cancel, shutdown, ready)
- SlotRequest/Response for per-slot data channel (predict, logs, output)
- SlotId using UUID for unique slot identification
- SlotOutcome for type-safe completion handling

JsonCodec wraps LengthDelimitedCodec with serde_json serialization.
NamedSocketTransport: filesystem sockets (all Unix platforms)
AbstractSocketTransport: Linux abstract namespace (no filesystem)

Adds ControlRequest::Init variant for worker initialization with
transport info, predictor ref, slot count, and async/train flags.
PredictionStatus enum with terminal state detection.
PredictionOutput for single values or streamed chunks.
Prediction struct tracks lifecycle: status, logs, outputs, timing.
Completion notification via tokio Notify for async waiting.
CancellationToken re-exported from tokio_util for cancel propagation.
PredictionResult aggregates output, timing, and logs.
PredictionGuard provides RAII timing with cancellation support.
PredictionMetrics for timing data collection.
PredictionError for typed prediction failures.
PredictFn/AsyncPredictFn type aliases for predictor function signatures.

Also fixes PredictionOutput to use untagged serde serialization.
PermitInUse/PermitIdle/PermitPoisoned enforce valid state transitions
at compile time - poisoned permits cannot become idle.

PermitPool manages slot permits with RAII semantics:
- Idle permits return to pool on drop
- Poisoned permits are orphaned, reducing capacity

AnyPermit enum for dynamic state storage.
PredictionSlot holds both together with separate concerns:
- Prediction behind Mutex for concurrent updates
- Permit for RAII pool return on drop

Slot transitions: into_idle() returns permit, into_poisoned() orphans it.
Drop handler fails non-terminal predictions if slot leaked.
Add [coglet_rust] skip to integration tests that coglet cannot run:

Python <3.10 (9 tests):
- python37_deprecated.txtar (3.7)
- optional_input.txtar (3.8)
- setup_subprocess_multiprocessing.txtar (3.8)
- apt_packages.txtar (3.9)
- bad_dockerignore.txtar (3.9)
- torch_baseimage_fallback.txtar (3.9)
- torch_baseimage_no_cog_base.txtar (3.9)
- torch_baseimage_precompile.txtar (3.9)
- torch_cuda_baseimage.txtar (3.9)

Monobase/fast build (7 tests):
- build_base_image_sha.txtar
- build_cog_version_match.txtar
- build_localimage.txtar
- build_python313_base_image.txtar
- predict_localimage.txtar
- fast_build.txtar
- run_fast_build.txtar

coglet (Rust) requires Python >=3.10 and does not support monobase.
Use build:coglet:wheel:linux instead of build:coglet:wheel to ensure
manylinux2014 (GLIBC 2.17) compatibility. Building on modern Linux
systems picks up too-recent GLIBC symbols (2.18-2.39) which breaks
manylinux compliance. Zig cross-compilation targets the correct GLIBC version.
Use astral-sh/setup-uv for proper venv management
- Fix Secret.json_encode() to unwrap Secret objects before JSON serialization
  This fixes 'TypeError: Object of type Secret is not JSON serializable'
  when generating OpenAPI schemas with Secret input fields that have defaults
- Skip pydantic2_output test for cog-dataclass/coglet_rust since it explicitly
  tests Pydantic 2 BaseModel features
- Make cog-dataclass to_json_schema() mode-aware:
  - Accept Mode parameter (PREDICT or TRAIN)
  - Generate appropriate routes (/predictions or /trainings)
  - Use correct schema names (PredictionRequest/Response or TrainingRequest/Response)

- Update coglet-python to pass HandlerMode through schema generation:
  - Pass mode from worker_bridge to predictor.schema()
  - Convert Rust HandlerMode enum to Python Mode enum
  - Pass mode to both pydantic (schema_via_fastapi) and cog-dataclass (to_json_schema)

This fixes the "missing output definition for /trainings" error when using
coglet with training endpoints, ensuring training has its own distinct
Input/Output schemas based on the train() method signature.
- Change 'missing required input field' to 'Field required' in cog-dataclass
- Revert string_predictor.txtar test to expect 's: Field required'
- Fix mise.toml to use 'python -m pytest' to avoid hardcoded shebang issues

This ensures cog-dataclass validation errors match pydantic's format,
making the integration test pass.
When Python's setup() spawns subprocesses (subprocess.Popen), they inherit
fd 1 (stdout) which is the control channel pipe to the orchestrator. Subprocess
output writes directly into the pipe, corrupting JSON framing with raw text.

This caused "frame size too big" errors when the bash subprocess in
setup_subprocess_simple.txtar wrote 100 lines during setup.

Solution: File descriptor redirection early in worker startup
- Move control channel to high-numbered fds (99-101)
- Replace fd 1/2 with capture pipes
- Spawn threads to read pipes and route output through log system
- Subprocess logs flow to coglet::user target (like orphan logs)
- Log forwarder runs for entire worker lifetime

Changes:
- Add crates/coglet/src/fd_redirect.rs with fd manipulation
- Update worker.rs to use redirected fds for control channel
- Add RUST_LOG propagation in CLI commands (serve, predict, run)
- Add RUST_LOG support in integration test harness
- Add frame size and log forwarding trace logging for debugging
- Route control channel logs to coglet::user target

Safety contracts documented for all unsafe blocks. Passes clippy with no warnings.
Fixes setup_subprocess_simple integration test.
Standardize validation error messages across cog-dataclass and coglet:
- cog-dataclass: Change "missing required input field" to "Field required"
- coglet: Parse Pydantic ValidationError to extract "field: message" format
- Both now output simple "field: message" instead of verbose error details

Fixes integration test string_predictor validation error matching.
make_encodeable() now checks for model_dump() (Pydantic v2) before
falling back to dict() (Pydantic v1). This fixes train() functions
that return Pydantic BaseModel instances like TrainingOutput.

Fixes train_basic integration test.
Parse Pydantic ValidationError to extract simple "field: message" format
instead of verbose error details. For ValueError (cog-dataclass), the
message is already in the correct format.

Includes clippy fixes for collapsible if statements.

Fixes string_predictor integration test validation error matching.
Capture thread improvements:
- Remove all tracing calls from capture threads to prevent feedback loop
  (stderr is redirected to capture pipe, so tracing would capture itself)
- Update comments to reflect bounded channel (500 messages) instead of unbounded
- Align safety comment with header documentation about tokio runtime threads

Log writer updates:
- Change SetupLogSender to use bounded Sender instead of UnboundedSender
- Use blocking_send() for backpressure handling
Change test to expect "Field required" instead of "missing required input field"
to match the updated error message format.
Critical fixes:
1. Replace blocking_send() with try_send() in ControlChannelLogSender to prevent
   "Cannot block the current thread" panic when called from Python code on tokio
   runtime threads

2. Add DroppedLogs control message and periodic reporting (every 5s) to track
   logs dropped due to backpressure

3. Rename SetupLogSender -> ControlChannelLogSender with accurate documentation:
   - Lives for entire worker lifetime, not just setup
   - Handles both Python setup logs and subprocess output
   - Cleanup is no-op (sender persists)

4. Log dropped logs as warnings in orchestrator with human-readable intervals

Fixes integration test panics during setup phase.
Add custom tracing layer for worker subprocess that ships structured
tracing events over IPC to orchestrator, preserving component targets.

Changes:
- Add WorkerLog control message with target/level/message fields
- Implement WorkerTracingLayer that intercepts tracing events before formatting
- Initialize custom layer in worker with IPC channel sender
- Orchestrator re-emits logs with original target using low-level tracing API
- Optional fd 101 direct logging via RUST_WORKER_DIRECT_LOG=1 for debugging
- Increased channel depth 500 → 5000 for trace-level logging support

This replaces the previous approach where tracing went through capture pipes
with ANSI codes, losing component information and appearing as generic
'coglet::setup' source.

Note: Setuplog accumulation not yet implemented - logs are shipped and
re-emitted but not added to SetupResult. Will be addressed in follow-up.
Filter out coglet::bridge::codec logs from IPC shipping to prevent
infinite loop where encoding a WorkerLog message triggers codec trace
logging, which creates another WorkerLog, which triggers more encoding, etc.

Changes:
- WorkerTracingLayer skips shipping logs with codec target
- Added SAFETY comment in codec.rs explaining the feedback loop risk
- Codec logs still visible via RUST_WORKER_DIRECT_LOG=1 for debugging
Adds SetupLogAccumulator tracing layer that captures all logs from the
coglet server process during the setup phase, from initial startup
through worker Ready.

Key implementation details:
- Tracing layer installed before any logs emitted (captures "coglet <version>" log)
- Accumulates orchestrator logs + re-emitted worker logs shipped over IPC
- Critical ordering: accumulate → Ready received → drain → flip Ready state
- Performance optimization: tx.is_closed() check prevents expensive ops after Ready
- Channel semantics: unbounded for setup phase, drainable even after close
- Log format: lines joined with \n separator, trailing newline at end

The accumulated logs are returned in SetupResult.logs and exposed via
/health-check endpoint for debugging setup failures.
Keep the setup log receiver alive through the 'Server ready' log,
then do a final drain to capture it before setting the setup result.

The 'Setup complete, now accepting requests' log happens after the
receiver is dropped, so it correctly does NOT appear in setup logs.

Test verifies both behaviors:
- 'Server ready' IS in setup logs
- 'Setup complete' is NOT in setup logs
Split coglet bindings CI runs by runtime and align predictor ref
parsing with cog loader semantics.

- Parse predictor refs to use file stem instead of full dotted path
  (e.g., 'path/to/predict.py:Predictor' -> module='predict')
- Split CI matrix to test both cog-dataclass and cog runtimes separately
- Add mise tasks for testing each runtime independently
- Ensures proper module import behavior matching Python's loader
Handle dataclass BaseModel outputs and unwrap pydantic iterators
before JSON encoding, plus rust-only setup log test guard.

- Add dataclass conversion in make_encodeable() for BaseModel outputs
- Unwrap pydantic serialization iterators in input processing and output serialization
- Guard setup_worker_tracing_logs.txtar test for rust-only coglet
- Add type safety check for numpy isinstance calls
@mfainberg-cf mfainberg-cf enabled auto-merge (squash) January 23, 2026 10:05
@mfainberg-cf mfainberg-cf disabled auto-merge January 23, 2026 10:05
@tempusfrangit tempusfrangit enabled auto-merge (squash) January 23, 2026 10:05
Copy link
Contributor

@markphelps markphelps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤘🏻

@tempusfrangit tempusfrangit merged commit 1d220a6 into main Jan 23, 2026
35 checks passed
@tempusfrangit tempusfrangit deleted the FFI branch January 23, 2026 14:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants