Graph pool rebase 2026#3999
Conversation
Revert "Check" This reverts commit dddaf1b. Check graph pool TODO: -> this requires additional patch in MP to reset initialized_ flag in CalculatorGraph and verify if that works. Previous MP tests with reruns worked due to using AddVectorSink which changes the underlying graph and does not use OutputStreamPollers. Need to verify if change in MP will enable graph pool or we need to go back to thread pool. Rebase POC MP FW test POC part 2 WIP to stash
a4a8f32 to
bb24c5c
Compare
525c8d8 to
79f91f3
Compare
79f91f3 to
002dc9b
Compare
0c50e49 to
f895c20
Compare
…downgrade SPDLOG_ERROR to SPDLOG_DEBUG
… queue-size-0 rejection, warn+clamp for exceeding hw threads, log cleanup
There was a problem hiding this comment.
Pull request overview
This PR introduces a MediaPipe graph pooling mechanism (“GraphQueue”) to reuse pre-initialized CalculatorGraph instances across requests, adds a # OVMS_GRAPH_QUEUE_SIZE: pbtxt directive (also emitted by graph exporters), and updates executors/tests/docs to exercise the new queue path (including LLM/OpenAI flows).
Changes:
- Add
GraphQueue+ observer-swapping mechanism to reuse MediaPipe graphs (unary + streaming) and track per-graph timestamps. - Parse
# OVMS_GRAPH_QUEUE_SIZE:directive during graph validation and initialize the pool accordingly; update graph export to emit the directive. - Expand and adjust unit/stress tests, configs, and docs to cover queue behavior and directive parsing.
Reviewed changes
Copilot reviewed 47 out of 48 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| src/test/test_utils.hpp | Adjust test graph definition utilities for sidePacketMaps now being a pointer/shared object. |
| src/test/stress_test_utils.hpp | Add queue-enabled configs and timing instrumentation for stress tests. |
| src/test/streaming_test.cpp | Add StreamingQueueTest cases and update executor construction signature. |
| src/test/pythonnode_test.cpp | Update mocked executor construction to new side-packets + guard API. |
| src/test/pull_hf_model_test.cpp | Update graph header stripping to account for optional queue directive line. |
| src/test/mediapipeflow_test.cpp | Minor robustness/logging tweaks; add directive parsing tests. |
| src/test/mediapipe_framework_test.cpp | Add POC/perf-style tests around output stream observers and graph creation approaches. |
| src/test/mediapipe/graphdummyadapterfull_dummyinputnames_newpath.pbtxt | Add a new test graph file used for reload path testing. |
| src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt | Add queue-enabled test graph with directive header for reload path testing. |
| src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt | Add queue-enabled test graph with directive header. |
| src/test/mediapipe/graph_gpt_with_queue.pbtxt | Add queue-enabled OpenAI mock graph with directive header. |
| src/test/mediapipe/config_mediapipe_openai_chat_completions_mock_with_queue.json | Add config pointing to queue-enabled OpenAI mock graph. |
| src/test/mediapipe/config_mediapipe_openai_chat_completions_mock.json | Formatting-only change. |
| src/test/llm/lm_cb_regular_queue.pbtxt | Add queue-enabled LLM test graph with directive header. |
| src/test/llm/llmnode_test.cpp | Add LLM HTTP tests using queue-enabled graph/config and reuse behavior. |
| src/test/llm/config_queue.json | Add LLM queue test config. |
| src/test/http_openai_handler_test.cpp | Add OpenAI handler tests for queue-enabled config and clarify non-queue behavior. |
| src/test/graph_export_test.cpp | Update graph export tests to validate queue header and strip generated headers robustly. |
| src/test/ensemble_config_change_stress.cpp | Add stress tests for queue-enabled graph add/remove/reload scenarios. |
| src/systeminfo.hpp | Minor formatting change. |
| src/systeminfo.cpp | Make getCoreCount() robust when hardware_concurrency() returns 0. |
| src/python/BUILD | Widen visibility of pythonnoderesources target (now public). |
| src/mediapipe_internal/outputstreamobserver.hpp | Introduce output stream observer interface, null observer, and holder. |
| src/mediapipe_internal/mediapipegraphexecutor.hpp | Add queue-aware execution paths, observer swapping, and LLM execution context handling. |
| src/mediapipe_internal/mediapipegraphexecutor.cpp | Implement queue-aware constructor and LLM execution context lifecycle helpers. |
| src/mediapipe_internal/mediapipegraphdefinition.hpp | Change side packets storage to shared ptr; add queue member and queue init hooks. |
| src/mediapipe_internal/mediapipegraphdefinition.cpp | Implement directive parsing and queue initialization; integrate into validation/reload/retire. |
| src/mediapipe_internal/mediapipegraphconfig.hpp | Add GraphQueueSizeValue representation and queue size resolution helpers. |
| src/mediapipe_internal/graphqueue.hpp | Add GraphQueue, GraphHelper, and GraphIdGuard API. |
| src/mediapipe_internal/graphqueue.cpp | Implement graph pool initialization, observer installation, and cleanup. |
| src/mediapipe_internal/graph_side_packets.hpp | Split side packet maps into a dedicated header including LLM execution context holders. |
| src/mediapipe_internal/graph_executor_constants.hpp | Centralize side packet tags and timestamp constants. |
| src/logging.cpp | Adjust default log pattern to use %f fractional seconds. |
| src/llm/http_llm_calculator.cc | Add optional shared execution-context side packet support and mutex-protected access. |
| src/llm/BUILD | Add dependency on new graph_side_packets library. |
| src/kfs_frontend/kfs_graph_executor_impl.hpp | Declare requestHasInputSidePackets helper for queue-stream rejection. |
| src/kfs_frontend/kfs_graph_executor_impl.cpp | Implement requestHasInputSidePackets and adjust timestamp/side-packet handling. |
| src/http_frontend/http_graph_executor_impl.hpp | Declare requestHasInputSidePackets for HTTP payload. |
| src/http_frontend/http_graph_executor_impl.cpp | Stub requestHasInputSidePackets for HTTP payload. |
| src/graph_export/graph_export.cpp | Emit queue directive line in generated graph headers. |
| src/graph_export/BUILD | Add systeminfo dependency. |
| src/cli_parser.cpp | Propagate restWorkers into export settings. |
| src/capi_frontend/server_settings.hpp | Add optional restWorkers to export settings. |
| src/BUILD | Add Bazel targets for new mediapipe_internal headers/libs and wire them into main build/test deps. |
| docs/mediapipe.md | Document graph pool feature and directive semantics. |
| demos/common/export_models/export_model.py | Prepend queue directive to exported graphs (but file currently contains an accidental build log). |
| demos/benchmark/v3/benchmark.py | Print latency percentiles in benchmark output. |
| common_settings.bzl | Remove -Wno-deprecated-declarations from static test copts. |
| float expVal = 13.5; | ||
| std::vector<float> data{expVal - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9}; | ||
| ovms::Timer<3> timer; | ||
| const std::string outputName{"output"}; | ||
| int N = 1000; | ||
|
|
||
| absl::Status absStatus; | ||
| // here starts new case of ovms | ||
| { // new case of ovms | ||
| ::mediapipe::CalculatorGraph graph; | ||
| EXPECT_EQ(graph.Initialize(graphConfig).code(), absl::StatusCode::kOk); | ||
| auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data()); | ||
| // Install NullObserver | ||
| // its not per graph but per output | ||
| std::shared_ptr<ovms::OutputStreamObserverI> perGraphObserverFunctor = std::make_shared<NullOutputStreamObserver>(); | ||
| MP_ERROR_STOP(graph.ObserveOutputStream(outputStreamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); })); | ||
| // Here ends model management | ||
| // Here starts mp graph executor | ||
| // ovms::GraphIdGuard graphIdGuard(queue); // TODO timeout? | ||
| // get graphIdGuard from queue | ||
| // create FrontendAppropriateObserver | ||
| struct MyFunctor : public OutputStreamObserverI { | ||
| float expVal; | ||
| MyFunctor(float expVal) : | ||
| expVal(expVal) { | ||
| } | ||
| absl::Status handlePacket(const ::mediapipe::Packet& packet) override { | ||
| const ov::Tensor& outputTensor = | ||
| packet.Get<ov::Tensor>(); | ||
| auto datatype = ov::element::Type_t::f32; | ||
| EXPECT_EQ(datatype, outputTensor.get_element_type()); | ||
| EXPECT_THAT(outputTensor.get_shape(), testing::ElementsAre(1, 10)); | ||
| const void* outputData = outputTensor.data(); | ||
| EXPECT_EQ(*((float*)outputData), expVal); | ||
| return absl::OkStatus(); | ||
| } | ||
| }; | ||
| absStatus = graph.StartRun({}); | ||
| { | ||
| perGraphObserverFunctor = std::make_shared<MyFunctor>(expVal); | ||
| auto copyOfMyFunctor = perGraphObserverFunctor; | ||
| auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data()); | ||
| MP_ERROR_STOP(graph.AddPacketToInputStream( | ||
| inputStreamName, Adopt(inputTensor.release()).At(Timestamp(timestamp++)))); | ||
| } | ||
| std::this_thread::sleep_for(std::chrono::seconds(1)); | ||
| timer.start(0); | ||
| for (auto i = 0; i < N; ++i) { // iter begin | ||
| perGraphObserverFunctor = std::make_shared<MyFunctor>(expVal); | ||
| auto copyOfMyFunctor = perGraphObserverFunctor; | ||
| auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data()); | ||
| MP_ERROR_STOP(graph.AddPacketToInputStream( | ||
| inputStreamName, Adopt(inputTensor.release()).At(Timestamp(timestamp++)))); | ||
| MP_ERROR_STOP(graph.WaitUntilIdle()); | ||
| } // iter end | ||
| timer.stop(0); | ||
| } // end of new case ovms | ||
| { // current ovms case |
There was a problem hiding this comment.
This test includes a hardcoded sleep_for(1s) and runs 1000 iterations of graph execution (plus a thread-pool comparison). That will significantly slow down the unit test suite and can be flaky across CI environments. Consider moving this into a benchmark/perf test, gating it behind a flag, or reducing it to a small functional assertion without timing loops/sleeps.
There was a problem hiding this comment.
Hidden behind:
SKIP_AND_EXIT_IF_NOT_RUNNING_UNSTABLE()
It will be useful to have to detecting Mediapipe changes.
| #pragma warning(pop) | ||
| #include "mediapipe_utils.hpp" | ||
| #include "packettypes.hpp" | ||
| #include "graphqueue.hpp" |
There was a problem hiding this comment.
outputstreamobserver.hpp includes graphqueue.hpp, but this header doesn’t appear to use any GraphQueue/GraphIdGuard types. Keeping this include creates a circular dependency (graphqueue.hpp also includes outputstreamobserver.hpp) and increases build times. Consider removing the #include "graphqueue.hpp" here and forward-declaring only what’s needed.
| #include "graphqueue.hpp" |
| GraphHelper(GraphHelper&& gh) : | ||
| graph(std::move(gh.graph)), | ||
| outStreamObservers(std::move(const_cast<std::unordered_map<std::string, std::shared_ptr<ObserverHolder>>&>(gh.outStreamObservers))), | ||
| genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)), | ||
| currentTimestamp(gh.currentTimestamp) {} |
There was a problem hiding this comment.
GraphHelper's move constructor casts away constness and moves from a const member (outStreamObservers). Modifying an object declared const is undefined behavior, even in a moved-from state. Consider removing the custom move ctor and keeping outStreamObservers non-const, or store the observer map behind a std::shared_ptr<const Map> so the keys are logically immutable without const_cast.
There was a problem hiding this comment.
I could change the map to non-const but it realistically won't be changed anywhere outside this place. Since it works I would leave it as is, if pressed remove the const from map.
| Status inferWithQueue(const RequestType* request, ResponseType* response, ExecutionContext executionContext, MetricCounterGuard& failedRequestsGuard) { | ||
| ::mediapipe::CalculatorGraph& graph = this->guard->graph; | ||
| auto llmContextStatus = initializeLlmExecutionContexts(this->guard->gh->genAiExecutionContextMap); | ||
| if (!llmContextStatus.ok()) { | ||
| return llmContextStatus; | ||
| } | ||
| for (auto& name : this->outputNames) { |
There was a problem hiding this comment.
In the queue path for unary inference, input side packets from the request are currently not rejected (only the streaming queue path checks requestHasInputSidePackets). This can silently ignore user-provided side packets while the non-queue path would apply them via StartRun(). Add the same requestHasInputSidePackets check (and return a clear error) in inferWithQueue to keep behavior consistent and avoid surprising clients.
There was a problem hiding this comment.
Both queue paths have this check. We do not have this limitatin for non-queue graphs.
67c1f3b to
c849bfa
Compare
…bled - Move resolveGraphQueueSize() before dryInitializeTest() in validate() flow so the check runs even when calculators are not registered in the binary. - Add precondition guard: return INTERNAL_ERROR if chosenConfig is empty. - Reject graphs that combine PythonExecutorCalculator + LOOPBACK stream with OVMS_GRAPH_QUEUE_MAX_SIZE > 0 or AUTO. Generative Python nodes hold per-request iterator state incompatible with pooled graph instances. - Add 4 unit tests covering: explicit queue + loopback rejected, AUTO + loopback rejected, Python without loopback allowed, loopback with queue disabled allowed. - Document the incompatibility in docs/mediapipe.md and docs/python_support/reference.md.
…ill-switch - Changed image generation graph queue size from 1 to AUTO (uniform for all graph types) - Added OVMS_GRAPH_QUEUE_OFF=1 runtime kill-switch env var to disable graph pools - Removed unused getDefaultGraphQueueSizeDirective and buildGraphHeader parameter - Added 2 unit tests for env var kill-switch behavior - Updated docs/mediapipe.md with corrected export_model.py info and kill-switch docs - Switched from raw std::getenv to project GetEnvVar utility
…pool_2026 # Conflicts: # src/systeminfo.cpp
| - In `export_model.py`: image generation graphs use `1`, and all other graph types use `AUTO`. | ||
| - In OVMS `--task ...` graph export: image generation graphs use `1`, and all other graph types use `AUTO`. | ||
| - OVMS `--task ...` graph export emits `# OVMS_GRAPH_QUEUE_MAX_SIZE: AUTO` for all graph types. | ||
| - `demos/common/export_models/export_model.py` does not emit this directive (graph pool disabled for graphs created by it). |
Align Python export_model.py graph templates with C++ CLI path. All 6 templates (text_gen, image_gen, embeddings, rerank, t2s, s2t) now emit the queue directive.
Conflicts resolved: - graph_side_packets.hpp: keep both <utility> and <vector> includes - graph_export_test.cpp: keep queue-aware test helpers - pull_hf_model_test.cpp: use global removeGeneratedGraphHeaders Additional fixes: - mediapipegraphdefinition.cpp: sidePacketMaps is shared_ptr, use -> not . - light_test_utils.hpp: rename removeVersionString to removeGeneratedGraphHeaders (strips both version line and queue directive) - lora_graph_export_test.cpp: update all call sites
The graph queue pre-initializes a pool of MediaPipe graph instances that are reused across requests, avoiding the overhead of creating and destroying graphs per request. Queue size is resolved during graph validation. Decision logic (in priority order) Explicit pbtxt directive - Add a comment in the graph .pbtxt file: # OVMS_GRAPH_QUEUE_SIZE: AUTO - pool sized to std::thread::hardware_concurrency() # OVMS_GRAPH_QUEUE_SIZE: <N> - pool of exactly N graphs (must be ≤ hardware threads) # OVMS_GRAPH_QUEUE_SIZE: 0 - explicitly disable pool This directive is always honored, regardless of env var or calculator type. Environment variable OVMS_GRAPH_QUEUE_OFF=1 - Suppresses the auto-enable default. Does not override an explicit pbtxt directive. If there is no comment in pbtxt file nor environment variable set then graph pool is disabled by default. Python calculator - If the graph contains PythonExecutorCalculator, the pool is automatically disabled (not yet safe for reuse). Default — If none of the above apply, the pool is enabled with AUTO sizing. Ticket:CVS-182707
The graph queue pre-initializes a pool of MediaPipe graph instances that are reused across requests, avoiding the overhead of creating and destroying graphs per request. Queue size is resolved during graph validation.
Decision logic (in priority order)
Explicit pbtxt directive - Add a comment in the graph .pbtxt file:
# OVMS_GRAPH_QUEUE_SIZE: AUTO- pool sized tostd::thread::hardware_concurrency()# OVMS_GRAPH_QUEUE_SIZE: <N>- pool of exactly N graphs (must be ≤ hardware threads)# OVMS_GRAPH_QUEUE_SIZE: 0- explicitly disable poolThis directive is always honored, regardless of env var or calculator type.
Environment variable
OVMS_GRAPH_QUEUE_OFF=1- Suppresses the auto-enable default. Does not override an explicit pbtxt directive.If there is no comment in pbtxt file nor environment variable set then graph pool is disabled by default.
Python calculator - If the graph contains PythonExecutorCalculator, the pool is automatically disabled (not yet safe for reuse).
Default — If none of the above apply, the pool is enabled with AUTO sizing.
Ticket:CVS-182707