-
Notifications
You must be signed in to change notification settings - Fork 654
Cog Server Rewrite #2641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cog Server Rewrite #2641
Conversation
|
in an effort to keep this as small as it can be (lololol) can we leave the coglet dir in the repo for now. Or if necessary, remove the go code and leave the python code? Would be nice to merge this as an addition only and then start wiring it up to the build/wheel selection system. I'd also like to salvage the python sdk portion of coglet since the complex type system and schema worked well for pipelines. |
The python section of the code is the problem as it conflicts with the new wheel. I'll get some rename I. Place instead of removal. |
fe20808 to
03a0158
Compare
markphelps
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of my comments are enum cleanup related because I just read that chapter in the Book of Rust
|
Yes the enum change was lost in translation. A number of other places I tried to ensure we had enums. I'll get an addon patch to make those conversions you highlighted unless you're interested in doing it. |
Replace boolean soup with strongly-typed enums across four areas: - PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags in predictor.rs (is_async, is_async_gen, has_train, is_train_async, is_standalone_function) - PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool and error string matching in worker.rs - HandlerMode enum (Predict/Train) replacing is_train boolean in worker_bridge.rs - SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields with state machine in worker_bridge.rs Benefits: - Invalid states impossible at compile time - Eliminates string matching for cancellation - Clearer semantics and self-documenting code - Easier to extend with new variants Addresses PR #2641 review feedback.
* refactor: replace boolean flags with Rust enums Replace boolean soup with strongly-typed enums across four areas: - PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags in predictor.rs (is_async, is_async_gen, has_train, is_train_async, is_standalone_function) - PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool and error string matching in worker.rs - HandlerMode enum (Predict/Train) replacing is_train boolean in worker_bridge.rs - SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields with state machine in worker_bridge.rs Benefits: - Invalid states impossible at compile time - Eliminates string matching for cancellation - Clearer semantics and self-documenting code - Easier to extend with new variants Addresses PR #2641 review feedback. * fix: apply clippy suggestions Use derive(Default) with #[default] attribute instead of manual impl. * style: apply rustfmt formatting * test: update tests to use PredictionOutcome enum Replace direct field access (.success, .error) with pattern matching on the PredictionOutcome enum. * chore: rm deadcode, simplify start_sync_prediction logic Signed-off-by: Mark Phelps <mphelps@cloudflare.com> --------- Signed-off-by: Mark Phelps <mphelps@cloudflare.com>
* refactor: replace boolean flags with Rust enums Replace boolean soup with strongly-typed enums across four areas: - PredictorKind enum (Class/StandaloneFunction) replacing 5 boolean flags in predictor.rs (is_async, is_async_gen, has_train, is_train_async, is_standalone_function) - PredictionOutcome enum (Success/Failed/Cancelled) replacing success bool and error string matching in worker.rs - HandlerMode enum (Predict/Train) replacing is_train boolean in worker_bridge.rs - SlotState enum (Idle/SyncPrediction/AsyncPrediction) replacing 4 fields with state machine in worker_bridge.rs Benefits: - Invalid states impossible at compile time - Eliminates string matching for cancellation - Clearer semantics and self-documenting code - Easier to extend with new variants Addresses PR #2641 review feedback. * fix: apply clippy suggestions Use derive(Default) with #[default] attribute instead of manual impl. * style: apply rustfmt formatting * test: update tests to use PredictionOutcome enum Replace direct field access (.success, .error) with pattern matching on the PredictionOutcome enum. * chore: rm deadcode, simplify start_sync_prediction logic Signed-off-by: Mark Phelps <mphelps@cloudflare.com> --------- Signed-off-by: Mark Phelps <mphelps@cloudflare.com>
Tasks for coglet development: - build:rust / build:rust:release - Build Rust crates - build:coglet - Build Python wheel with maturin (dev) - build:coglet:wheel* - Build release wheels for various platforms - test:rust - Run Rust tests with nextest - test:coglet-python - Run Python integration tests - fmt:rust / fmt:rust:check - Format Rust code - clippy - Run lints - deny - Check licenses/advisories Tools added: cargo-binstall, cargo-deny, cargo-insta, cargo-nextest, maturin, ruff, ty
Workspace structure: - crates/Cargo.toml - workspace root with shared dependencies - crates/coglet/ - main Rust library (empty scaffold) - crates/deny.toml - cargo-deny license/advisory config - crates/.gitignore - ignore build artifacts Shared workspace dependencies defined for async runtime (tokio), HTTP (axum, reqwest), serialization (serde), and error handling.
Runtime: tokio, tokio-util (codec), futures, async-trait Serialization: serde, serde_json HTTP: axum (server), reqwest + ureq (webhooks) Utils: uuid, chrono, thiserror, anyhow, dashmap, tracing Unix: nix (signal handling) Dev: insta, wiremock, tower, http-body-util
Health enum: Unknown, Starting, Ready, Busy, SetupFailed, Defunct SetupStatus enum with SetupResult for tracking setup phase VersionInfo for runtime version reporting Includes serde serialization with appropriate casing and tests.
Protocol types for parent-worker communication: - ControlRequest/Response for control channel (cancel, shutdown, ready) - SlotRequest/Response for per-slot data channel (predict, logs, output) - SlotId using UUID for unique slot identification - SlotOutcome for type-safe completion handling JsonCodec wraps LengthDelimitedCodec with serde_json serialization.
NamedSocketTransport: filesystem sockets (all Unix platforms) AbstractSocketTransport: Linux abstract namespace (no filesystem) Adds ControlRequest::Init variant for worker initialization with transport info, predictor ref, slot count, and async/train flags.
PredictionStatus enum with terminal state detection. PredictionOutput for single values or streamed chunks. Prediction struct tracks lifecycle: status, logs, outputs, timing. Completion notification via tokio Notify for async waiting. CancellationToken re-exported from tokio_util for cancel propagation.
PredictionResult aggregates output, timing, and logs. PredictionGuard provides RAII timing with cancellation support. PredictionMetrics for timing data collection. PredictionError for typed prediction failures. PredictFn/AsyncPredictFn type aliases for predictor function signatures. Also fixes PredictionOutput to use untagged serde serialization.
PermitInUse/PermitIdle/PermitPoisoned enforce valid state transitions at compile time - poisoned permits cannot become idle. PermitPool manages slot permits with RAII semantics: - Idle permits return to pool on drop - Poisoned permits are orphaned, reducing capacity AnyPermit enum for dynamic state storage.
PredictionSlot holds both together with separate concerns: - Prediction behind Mutex for concurrent updates - Permit for RAII pool return on drop Slot transitions: into_idle() returns permit, into_poisoned() orphans it. Drop handler fails non-terminal predictions if slot leaked.
Add [coglet_rust] skip to integration tests that coglet cannot run: Python <3.10 (9 tests): - python37_deprecated.txtar (3.7) - optional_input.txtar (3.8) - setup_subprocess_multiprocessing.txtar (3.8) - apt_packages.txtar (3.9) - bad_dockerignore.txtar (3.9) - torch_baseimage_fallback.txtar (3.9) - torch_baseimage_no_cog_base.txtar (3.9) - torch_baseimage_precompile.txtar (3.9) - torch_cuda_baseimage.txtar (3.9) Monobase/fast build (7 tests): - build_base_image_sha.txtar - build_cog_version_match.txtar - build_localimage.txtar - build_python313_base_image.txtar - predict_localimage.txtar - fast_build.txtar - run_fast_build.txtar coglet (Rust) requires Python >=3.10 and does not support monobase.
Use build:coglet:wheel:linux instead of build:coglet:wheel to ensure manylinux2014 (GLIBC 2.17) compatibility. Building on modern Linux systems picks up too-recent GLIBC symbols (2.18-2.39) which breaks manylinux compliance. Zig cross-compilation targets the correct GLIBC version.
Use astral-sh/setup-uv for proper venv management
- Fix Secret.json_encode() to unwrap Secret objects before JSON serialization This fixes 'TypeError: Object of type Secret is not JSON serializable' when generating OpenAPI schemas with Secret input fields that have defaults - Skip pydantic2_output test for cog-dataclass/coglet_rust since it explicitly tests Pydantic 2 BaseModel features
- Make cog-dataclass to_json_schema() mode-aware: - Accept Mode parameter (PREDICT or TRAIN) - Generate appropriate routes (/predictions or /trainings) - Use correct schema names (PredictionRequest/Response or TrainingRequest/Response) - Update coglet-python to pass HandlerMode through schema generation: - Pass mode from worker_bridge to predictor.schema() - Convert Rust HandlerMode enum to Python Mode enum - Pass mode to both pydantic (schema_via_fastapi) and cog-dataclass (to_json_schema) This fixes the "missing output definition for /trainings" error when using coglet with training endpoints, ensuring training has its own distinct Input/Output schemas based on the train() method signature.
- Change 'missing required input field' to 'Field required' in cog-dataclass - Revert string_predictor.txtar test to expect 's: Field required' - Fix mise.toml to use 'python -m pytest' to avoid hardcoded shebang issues This ensures cog-dataclass validation errors match pydantic's format, making the integration test pass.
When Python's setup() spawns subprocesses (subprocess.Popen), they inherit fd 1 (stdout) which is the control channel pipe to the orchestrator. Subprocess output writes directly into the pipe, corrupting JSON framing with raw text. This caused "frame size too big" errors when the bash subprocess in setup_subprocess_simple.txtar wrote 100 lines during setup. Solution: File descriptor redirection early in worker startup - Move control channel to high-numbered fds (99-101) - Replace fd 1/2 with capture pipes - Spawn threads to read pipes and route output through log system - Subprocess logs flow to coglet::user target (like orphan logs) - Log forwarder runs for entire worker lifetime Changes: - Add crates/coglet/src/fd_redirect.rs with fd manipulation - Update worker.rs to use redirected fds for control channel - Add RUST_LOG propagation in CLI commands (serve, predict, run) - Add RUST_LOG support in integration test harness - Add frame size and log forwarding trace logging for debugging - Route control channel logs to coglet::user target Safety contracts documented for all unsafe blocks. Passes clippy with no warnings. Fixes setup_subprocess_simple integration test.
Standardize validation error messages across cog-dataclass and coglet: - cog-dataclass: Change "missing required input field" to "Field required" - coglet: Parse Pydantic ValidationError to extract "field: message" format - Both now output simple "field: message" instead of verbose error details Fixes integration test string_predictor validation error matching.
make_encodeable() now checks for model_dump() (Pydantic v2) before falling back to dict() (Pydantic v1). This fixes train() functions that return Pydantic BaseModel instances like TrainingOutput. Fixes train_basic integration test.
Parse Pydantic ValidationError to extract simple "field: message" format instead of verbose error details. For ValueError (cog-dataclass), the message is already in the correct format. Includes clippy fixes for collapsible if statements. Fixes string_predictor integration test validation error matching.
Capture thread improvements: - Remove all tracing calls from capture threads to prevent feedback loop (stderr is redirected to capture pipe, so tracing would capture itself) - Update comments to reflect bounded channel (500 messages) instead of unbounded - Align safety comment with header documentation about tokio runtime threads Log writer updates: - Change SetupLogSender to use bounded Sender instead of UnboundedSender - Use blocking_send() for backpressure handling
Change test to expect "Field required" instead of "missing required input field" to match the updated error message format.
Critical fixes: 1. Replace blocking_send() with try_send() in ControlChannelLogSender to prevent "Cannot block the current thread" panic when called from Python code on tokio runtime threads 2. Add DroppedLogs control message and periodic reporting (every 5s) to track logs dropped due to backpressure 3. Rename SetupLogSender -> ControlChannelLogSender with accurate documentation: - Lives for entire worker lifetime, not just setup - Handles both Python setup logs and subprocess output - Cleanup is no-op (sender persists) 4. Log dropped logs as warnings in orchestrator with human-readable intervals Fixes integration test panics during setup phase.
Add custom tracing layer for worker subprocess that ships structured tracing events over IPC to orchestrator, preserving component targets. Changes: - Add WorkerLog control message with target/level/message fields - Implement WorkerTracingLayer that intercepts tracing events before formatting - Initialize custom layer in worker with IPC channel sender - Orchestrator re-emits logs with original target using low-level tracing API - Optional fd 101 direct logging via RUST_WORKER_DIRECT_LOG=1 for debugging - Increased channel depth 500 → 5000 for trace-level logging support This replaces the previous approach where tracing went through capture pipes with ANSI codes, losing component information and appearing as generic 'coglet::setup' source. Note: Setuplog accumulation not yet implemented - logs are shipped and re-emitted but not added to SetupResult. Will be addressed in follow-up.
Filter out coglet::bridge::codec logs from IPC shipping to prevent infinite loop where encoding a WorkerLog message triggers codec trace logging, which creates another WorkerLog, which triggers more encoding, etc. Changes: - WorkerTracingLayer skips shipping logs with codec target - Added SAFETY comment in codec.rs explaining the feedback loop risk - Codec logs still visible via RUST_WORKER_DIRECT_LOG=1 for debugging
Adds SetupLogAccumulator tracing layer that captures all logs from the coglet server process during the setup phase, from initial startup through worker Ready. Key implementation details: - Tracing layer installed before any logs emitted (captures "coglet <version>" log) - Accumulates orchestrator logs + re-emitted worker logs shipped over IPC - Critical ordering: accumulate → Ready received → drain → flip Ready state - Performance optimization: tx.is_closed() check prevents expensive ops after Ready - Channel semantics: unbounded for setup phase, drainable even after close - Log format: lines joined with \n separator, trailing newline at end The accumulated logs are returned in SetupResult.logs and exposed via /health-check endpoint for debugging setup failures.
Keep the setup log receiver alive through the 'Server ready' log, then do a final drain to capture it before setting the setup result. The 'Setup complete, now accepting requests' log happens after the receiver is dropped, so it correctly does NOT appear in setup logs. Test verifies both behaviors: - 'Server ready' IS in setup logs - 'Setup complete' is NOT in setup logs
Split coglet bindings CI runs by runtime and align predictor ref parsing with cog loader semantics. - Parse predictor refs to use file stem instead of full dotted path (e.g., 'path/to/predict.py:Predictor' -> module='predict') - Split CI matrix to test both cog-dataclass and cog runtimes separately - Add mise tasks for testing each runtime independently - Ensures proper module import behavior matching Python's loader
Handle dataclass BaseModel outputs and unwrap pydantic iterators before JSON encoding, plus rust-only setup log test guard. - Add dataclass conversion in make_encodeable() for BaseModel outputs - Unwrap pydantic serialization iterators in input processing and output serialization - Guard setup_worker_tracing_logs.txtar test for rust-only coglet - Add type safety check for numpy isinstance calls
markphelps
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤘🏻
This is a complete rewrite of the cog http server moving away from python and to a rust pyo3, ABI3 wheel. The aim is to provide a significant uplift in control of how we run predictors and isolate the predictor run from the core interface used to run the predictor.