Skip to content

Graph pool rebase 2026#3999

Merged
atobiszei merged 31 commits into
mainfrom
atobisze_check_graph_pool_2026
May 21, 2026
Merged

Graph pool rebase 2026#3999
atobiszei merged 31 commits into
mainfrom
atobisze_check_graph_pool_2026

Conversation

@atobiszei
Copy link
Copy Markdown
Collaborator

@atobiszei atobiszei commented Feb 20, 2026

The graph queue pre-initializes a pool of MediaPipe graph instances that are reused across requests, avoiding the overhead of creating and destroying graphs per request. Queue size is resolved during graph validation.

Decision logic (in priority order)
Explicit pbtxt directive - Add a comment in the graph .pbtxt file:

  • # OVMS_GRAPH_QUEUE_SIZE: AUTO - pool sized to std::thread::hardware_concurrency()
  • # OVMS_GRAPH_QUEUE_SIZE: <N> - pool of exactly N graphs (must be ≤ hardware threads)
  • # OVMS_GRAPH_QUEUE_SIZE: 0 - explicitly disable pool

This directive is always honored, regardless of env var or calculator type.
Environment variable OVMS_GRAPH_QUEUE_OFF=1 - Suppresses the auto-enable default. Does not override an explicit pbtxt directive.

If there is no comment in pbtxt file nor environment variable set then graph pool is disabled by default.

Python calculator - If the graph contains PythonExecutorCalculator, the pool is automatically disabled (not yet safe for reuse).

Default — If none of the above apply, the pool is enabled with AUTO sizing.

Ticket:CVS-182707

Revert "Check"

This reverts commit dddaf1b.

Check graph pool

TODO:
-> this requires additional patch in MP to reset initialized_ flag in
CalculatorGraph and verify if that works. Previous MP tests with reruns
worked due to using AddVectorSink which changes the underlying graph and
does not use OutputStreamPollers. Need to verify if change in MP will
enable graph pool or we need to go back to thread pool.

Rebase

POC MP FW test

POC part 2

WIP to stash
@atobiszei atobiszei force-pushed the atobisze_check_graph_pool_2026 branch from a4a8f32 to bb24c5c Compare February 23, 2026 10:04
@atobiszei atobiszei force-pushed the atobisze_check_graph_pool_2026 branch 3 times, most recently from 525c8d8 to 79f91f3 Compare February 23, 2026 14:53
@atobiszei atobiszei force-pushed the atobisze_check_graph_pool_2026 branch from 79f91f3 to 002dc9b Compare February 24, 2026 12:33
@atobiszei atobiszei force-pushed the atobisze_check_graph_pool_2026 branch from 0c50e49 to f895c20 Compare March 9, 2026 14:47
@atobiszei atobiszei marked this pull request as ready for review March 20, 2026 12:39
Copilot AI review requested due to automatic review settings March 20, 2026 12:39
@atobiszei atobiszei review requested due to automatic review settings March 20, 2026 12:40
… queue-size-0 rejection, warn+clamp for exceeding hw threads, log cleanup
Copilot AI review requested due to automatic review settings March 25, 2026 16:08
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a MediaPipe graph pooling mechanism (“GraphQueue”) to reuse pre-initialized CalculatorGraph instances across requests, adds a # OVMS_GRAPH_QUEUE_SIZE: pbtxt directive (also emitted by graph exporters), and updates executors/tests/docs to exercise the new queue path (including LLM/OpenAI flows).

Changes:

  • Add GraphQueue + observer-swapping mechanism to reuse MediaPipe graphs (unary + streaming) and track per-graph timestamps.
  • Parse # OVMS_GRAPH_QUEUE_SIZE: directive during graph validation and initialize the pool accordingly; update graph export to emit the directive.
  • Expand and adjust unit/stress tests, configs, and docs to cover queue behavior and directive parsing.

Reviewed changes

Copilot reviewed 47 out of 48 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
src/test/test_utils.hpp Adjust test graph definition utilities for sidePacketMaps now being a pointer/shared object.
src/test/stress_test_utils.hpp Add queue-enabled configs and timing instrumentation for stress tests.
src/test/streaming_test.cpp Add StreamingQueueTest cases and update executor construction signature.
src/test/pythonnode_test.cpp Update mocked executor construction to new side-packets + guard API.
src/test/pull_hf_model_test.cpp Update graph header stripping to account for optional queue directive line.
src/test/mediapipeflow_test.cpp Minor robustness/logging tweaks; add directive parsing tests.
src/test/mediapipe_framework_test.cpp Add POC/perf-style tests around output stream observers and graph creation approaches.
src/test/mediapipe/graphdummyadapterfull_dummyinputnames_newpath.pbtxt Add a new test graph file used for reload path testing.
src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames_newpath.pbtxt Add queue-enabled test graph with directive header for reload path testing.
src/test/mediapipe/graph_queue_dummyadapterfull_dummyinputnames.pbtxt Add queue-enabled test graph with directive header.
src/test/mediapipe/graph_gpt_with_queue.pbtxt Add queue-enabled OpenAI mock graph with directive header.
src/test/mediapipe/config_mediapipe_openai_chat_completions_mock_with_queue.json Add config pointing to queue-enabled OpenAI mock graph.
src/test/mediapipe/config_mediapipe_openai_chat_completions_mock.json Formatting-only change.
src/test/llm/lm_cb_regular_queue.pbtxt Add queue-enabled LLM test graph with directive header.
src/test/llm/llmnode_test.cpp Add LLM HTTP tests using queue-enabled graph/config and reuse behavior.
src/test/llm/config_queue.json Add LLM queue test config.
src/test/http_openai_handler_test.cpp Add OpenAI handler tests for queue-enabled config and clarify non-queue behavior.
src/test/graph_export_test.cpp Update graph export tests to validate queue header and strip generated headers robustly.
src/test/ensemble_config_change_stress.cpp Add stress tests for queue-enabled graph add/remove/reload scenarios.
src/systeminfo.hpp Minor formatting change.
src/systeminfo.cpp Make getCoreCount() robust when hardware_concurrency() returns 0.
src/python/BUILD Widen visibility of pythonnoderesources target (now public).
src/mediapipe_internal/outputstreamobserver.hpp Introduce output stream observer interface, null observer, and holder.
src/mediapipe_internal/mediapipegraphexecutor.hpp Add queue-aware execution paths, observer swapping, and LLM execution context handling.
src/mediapipe_internal/mediapipegraphexecutor.cpp Implement queue-aware constructor and LLM execution context lifecycle helpers.
src/mediapipe_internal/mediapipegraphdefinition.hpp Change side packets storage to shared ptr; add queue member and queue init hooks.
src/mediapipe_internal/mediapipegraphdefinition.cpp Implement directive parsing and queue initialization; integrate into validation/reload/retire.
src/mediapipe_internal/mediapipegraphconfig.hpp Add GraphQueueSizeValue representation and queue size resolution helpers.
src/mediapipe_internal/graphqueue.hpp Add GraphQueue, GraphHelper, and GraphIdGuard API.
src/mediapipe_internal/graphqueue.cpp Implement graph pool initialization, observer installation, and cleanup.
src/mediapipe_internal/graph_side_packets.hpp Split side packet maps into a dedicated header including LLM execution context holders.
src/mediapipe_internal/graph_executor_constants.hpp Centralize side packet tags and timestamp constants.
src/logging.cpp Adjust default log pattern to use %f fractional seconds.
src/llm/http_llm_calculator.cc Add optional shared execution-context side packet support and mutex-protected access.
src/llm/BUILD Add dependency on new graph_side_packets library.
src/kfs_frontend/kfs_graph_executor_impl.hpp Declare requestHasInputSidePackets helper for queue-stream rejection.
src/kfs_frontend/kfs_graph_executor_impl.cpp Implement requestHasInputSidePackets and adjust timestamp/side-packet handling.
src/http_frontend/http_graph_executor_impl.hpp Declare requestHasInputSidePackets for HTTP payload.
src/http_frontend/http_graph_executor_impl.cpp Stub requestHasInputSidePackets for HTTP payload.
src/graph_export/graph_export.cpp Emit queue directive line in generated graph headers.
src/graph_export/BUILD Add systeminfo dependency.
src/cli_parser.cpp Propagate restWorkers into export settings.
src/capi_frontend/server_settings.hpp Add optional restWorkers to export settings.
src/BUILD Add Bazel targets for new mediapipe_internal headers/libs and wire them into main build/test deps.
docs/mediapipe.md Document graph pool feature and directive semantics.
demos/common/export_models/export_model.py Prepend queue directive to exported graphs (but file currently contains an accidental build log).
demos/benchmark/v3/benchmark.py Print latency percentiles in benchmark output.
common_settings.bzl Remove -Wno-deprecated-declarations from static test copts.

Comment thread docs/mediapipe.md Outdated
Comment on lines +343 to +400
float expVal = 13.5;
std::vector<float> data{expVal - 1, 1, 2, 3, 4, 5, 6, 7, 8, 9};
ovms::Timer<3> timer;
const std::string outputName{"output"};
int N = 1000;

absl::Status absStatus;
// here starts new case of ovms
{ // new case of ovms
::mediapipe::CalculatorGraph graph;
EXPECT_EQ(graph.Initialize(graphConfig).code(), absl::StatusCode::kOk);
auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data());
// Install NullObserver
// its not per graph but per output
std::shared_ptr<ovms::OutputStreamObserverI> perGraphObserverFunctor = std::make_shared<NullOutputStreamObserver>();
MP_ERROR_STOP(graph.ObserveOutputStream(outputStreamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }));
// Here ends model management
// Here starts mp graph executor
// ovms::GraphIdGuard graphIdGuard(queue); // TODO timeout?
// get graphIdGuard from queue
// create FrontendAppropriateObserver
struct MyFunctor : public OutputStreamObserverI {
float expVal;
MyFunctor(float expVal) :
expVal(expVal) {
}
absl::Status handlePacket(const ::mediapipe::Packet& packet) override {
const ov::Tensor& outputTensor =
packet.Get<ov::Tensor>();
auto datatype = ov::element::Type_t::f32;
EXPECT_EQ(datatype, outputTensor.get_element_type());
EXPECT_THAT(outputTensor.get_shape(), testing::ElementsAre(1, 10));
const void* outputData = outputTensor.data();
EXPECT_EQ(*((float*)outputData), expVal);
return absl::OkStatus();
}
};
absStatus = graph.StartRun({});
{
perGraphObserverFunctor = std::make_shared<MyFunctor>(expVal);
auto copyOfMyFunctor = perGraphObserverFunctor;
auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data());
MP_ERROR_STOP(graph.AddPacketToInputStream(
inputStreamName, Adopt(inputTensor.release()).At(Timestamp(timestamp++))));
}
std::this_thread::sleep_for(std::chrono::seconds(1));
timer.start(0);
for (auto i = 0; i < N; ++i) { // iter begin
perGraphObserverFunctor = std::make_shared<MyFunctor>(expVal);
auto copyOfMyFunctor = perGraphObserverFunctor;
auto inputTensor = std::make_unique<ov::Tensor>(datatype, shape, data.data());
MP_ERROR_STOP(graph.AddPacketToInputStream(
inputStreamName, Adopt(inputTensor.release()).At(Timestamp(timestamp++))));
MP_ERROR_STOP(graph.WaitUntilIdle());
} // iter end
timer.stop(0);
} // end of new case ovms
{ // current ovms case
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test includes a hardcoded sleep_for(1s) and runs 1000 iterations of graph execution (plus a thread-pool comparison). That will significantly slow down the unit test suite and can be flaky across CI environments. Consider moving this into a benchmark/perf test, gating it behind a flag, or reducing it to a small functional assertion without timing loops/sleeps.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it resolved? @atobiszei

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hidden behind:
SKIP_AND_EXIT_IF_NOT_RUNNING_UNSTABLE()

It will be useful to have to detecting Mediapipe changes.

Comment thread src/python/BUILD Outdated
Comment thread demos/common/export_models/export_model.py Outdated
#pragma warning(pop)
#include "mediapipe_utils.hpp"
#include "packettypes.hpp"
#include "graphqueue.hpp"
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputstreamobserver.hpp includes graphqueue.hpp, but this header doesn’t appear to use any GraphQueue/GraphIdGuard types. Keeping this include creates a circular dependency (graphqueue.hpp also includes outputstreamobserver.hpp) and increases build times. Consider removing the #include "graphqueue.hpp" here and forward-declaring only what’s needed.

Suggested change
#include "graphqueue.hpp"

Copilot uses AI. Check for mistakes.
Comment on lines +62 to +66
GraphHelper(GraphHelper&& gh) :
graph(std::move(gh.graph)),
outStreamObservers(std::move(const_cast<std::unordered_map<std::string, std::shared_ptr<ObserverHolder>>&>(gh.outStreamObservers))),
genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)),
currentTimestamp(gh.currentTimestamp) {}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GraphHelper's move constructor casts away constness and moves from a const member (outStreamObservers). Modifying an object declared const is undefined behavior, even in a moved-from state. Consider removing the custom move ctor and keeping outStreamObservers non-const, or store the observer map behind a std::shared_ptr<const Map> so the keys are logically immutable without const_cast.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@dkalinowski dkalinowski May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @atobiszei

Copy link
Copy Markdown
Collaborator Author

@atobiszei atobiszei May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could change the map to non-const but it realistically won't be changed anywhere outside this place. Since it works I would leave it as is, if pressed remove the const from map.

Comment thread src/mediapipe_internal/outputstreamobserver.hpp Outdated
Comment on lines +180 to +186
Status inferWithQueue(const RequestType* request, ResponseType* response, ExecutionContext executionContext, MetricCounterGuard& failedRequestsGuard) {
::mediapipe::CalculatorGraph& graph = this->guard->graph;
auto llmContextStatus = initializeLlmExecutionContexts(this->guard->gh->genAiExecutionContextMap);
if (!llmContextStatus.ok()) {
return llmContextStatus;
}
for (auto& name : this->outputNames) {
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the queue path for unary inference, input side packets from the request are currently not rejected (only the streaming queue path checks requestHasInputSidePackets). This can silently ignore user-provided side packets while the non-queue path would apply them via StartRun(). Add the same requestHasInputSidePackets check (and return a clear error) in inferWithQueue to keep behavior consistent and avoid surprising clients.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both queue paths have this check. We do not have this limitatin for non-queue graphs.

Comment thread src/mediapipe_internal/mediapipegraphexecutor.hpp
Comment thread src/mediapipe_internal/mediapipegraphdefinition.cpp
@atobiszei atobiszei force-pushed the atobisze_check_graph_pool_2026 branch from 67c1f3b to c849bfa Compare April 27, 2026 09:20
Comment thread docs/mediapipe.md Outdated
Comment thread src/mediapipe_internal/graph_side_packets.hpp Outdated
Comment thread src/mediapipe_internal/mediapipegraphdefinition.cpp Outdated
Comment thread src/mediapipe_internal/mediapipegraphdefinition.hpp Outdated
Comment thread src/mediapipe_internal/mediapipegraphexecutor.hpp Outdated
Comment thread src/mediapipe_internal/mediapipegraphexecutor.hpp Outdated
Comment thread src/test/llm/config_queue.json
Comment thread src/test/mediapipe_framework_test.cpp
Comment thread src/test/mediapipe_framework_test.cpp
Comment thread src/test/stress_test_utils.hpp
Comment thread common_settings.bzl
Comment thread docs/mediapipe.md Outdated
Comment thread docs/mediapipe.md Outdated
Comment thread src/graph_export/graph_export.cpp
Comment thread src/llm/http_llm_calculator.cc
Comment thread src/mediapipe_internal/graph_side_packets.hpp Outdated
Comment thread src/mediapipe_internal/graphqueue.cpp
atobiszei added 4 commits May 20, 2026 11:01
…bled

- Move resolveGraphQueueSize() before dryInitializeTest() in validate() flow
  so the check runs even when calculators are not registered in the binary.
- Add precondition guard: return INTERNAL_ERROR if chosenConfig is empty.
- Reject graphs that combine PythonExecutorCalculator + LOOPBACK stream with
  OVMS_GRAPH_QUEUE_MAX_SIZE > 0 or AUTO. Generative Python nodes hold
  per-request iterator state incompatible with pooled graph instances.
- Add 4 unit tests covering: explicit queue + loopback rejected, AUTO + loopback
  rejected, Python without loopback allowed, loopback with queue disabled allowed.
- Document the incompatibility in docs/mediapipe.md and
  docs/python_support/reference.md.
…ill-switch

- Changed image generation graph queue size from 1 to AUTO (uniform for all graph types)
- Added OVMS_GRAPH_QUEUE_OFF=1 runtime kill-switch env var to disable graph pools
- Removed unused getDefaultGraphQueueSizeDirective and buildGraphHeader parameter
- Added 2 unit tests for env var kill-switch behavior
- Updated docs/mediapipe.md with corrected export_model.py info and kill-switch docs
- Switched from raw std::getenv to project GetEnvVar utility
Comment thread docs/mediapipe.md Outdated
- In `export_model.py`: image generation graphs use `1`, and all other graph types use `AUTO`.
- In OVMS `--task ...` graph export: image generation graphs use `1`, and all other graph types use `AUTO`.
- OVMS `--task ...` graph export emits `# OVMS_GRAPH_QUEUE_MAX_SIZE: AUTO` for all graph types.
- `demos/common/export_models/export_model.py` does not emit this directive (graph pool disabled for graphs created by it).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Align Python export_model.py graph templates with C++ CLI path.
All 6 templates (text_gen, image_gen, embeddings, rerank, t2s, s2t)
now emit the queue directive.
atobiszei added 2 commits May 21, 2026 13:58
Conflicts resolved:
- graph_side_packets.hpp: keep both <utility> and <vector> includes
- graph_export_test.cpp: keep queue-aware test helpers
- pull_hf_model_test.cpp: use global removeGeneratedGraphHeaders

Additional fixes:
- mediapipegraphdefinition.cpp: sidePacketMaps is shared_ptr, use -> not .
- light_test_utils.hpp: rename removeVersionString to removeGeneratedGraphHeaders
  (strips both version line and queue directive)
- lora_graph_export_test.cpp: update all call sites
@atobiszei atobiszei merged commit 188d323 into main May 21, 2026
1 check passed
porlows1 pushed a commit that referenced this pull request May 28, 2026
The graph queue pre-initializes a pool of MediaPipe graph instances that are reused across requests, avoiding the overhead of creating and destroying graphs per request. Queue size is resolved during graph validation.

Decision logic (in priority order)
Explicit pbtxt directive - Add a comment in the graph .pbtxt file:

# OVMS_GRAPH_QUEUE_SIZE: AUTO - pool sized to std::thread::hardware_concurrency()
# OVMS_GRAPH_QUEUE_SIZE: <N> - pool of exactly N graphs (must be ≤ hardware threads)
# OVMS_GRAPH_QUEUE_SIZE: 0 - explicitly disable pool
This directive is always honored, regardless of env var or calculator type.
Environment variable OVMS_GRAPH_QUEUE_OFF=1 - Suppresses the auto-enable default. Does not override an explicit pbtxt directive.

If there is no comment in pbtxt file nor environment variable set then graph pool is disabled by default.

Python calculator - If the graph contains PythonExecutorCalculator, the pool is automatically disabled (not yet safe for reuse).

Default — If none of the above apply, the pool is enabled with AUTO sizing.

Ticket:CVS-182707
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants