-
Notifications
You must be signed in to change notification settings - Fork 19
Implement lazy subscription for Coalesce function #1322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Based on #1295 |
Also use `None` as a default value for span, to avoid having to type it when constructing higher level formulas. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Also rename it to AstNode This way, nodes can be accessed by the _function.py module, without circular imports. That would enable functions to handle their own arguments. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
This would enable special forms like `coalesce` to fetch only what's necessary. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
This changes `AstNode.evaluate()` to return `Sample[QuantityT] | QuantityT | None` instead of `float | None`. This makes timestamps available to coalesce node, so it knows how to synchronize newly started fallback streams with the primary streams. This also requires a `create_method` to be passed to TelemetryStream, for accurate typing, so the `Quantity` types from the resampler don't get sent out as they are, in case of simple formulas, because there is no top-level re-wrapping of the values in the formula evaluator anymore. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Also update `Function.__call__()` to async. This would make it possible for `coalesce` to start async streams as necessary during evaluation. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Earlier we were subscribing to all streams, even if they were going to be discarded in a coalesce. This commit makes further progress towards implementing lazy coalesce. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
This is where we start to see the limitations of the string formula spec. For a power formula, we treat the constant in: - `frequenz-floss#10 + 100.0` as a power value in watts. - `frequenz-floss#10 / 100.0` as a float value. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
With this subscription can be delegated to individual nodes, allowing coalesce to subscribe to only the interesting stuff. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
And drop the old centralized `synchronize_receivers` method, delegating synchronization to individual nodes. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Now that individual nodes control telemetry subscriptions, the order in which subscriptions happen is not deterministic. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
This will be needed for ongoing operation of `coalesce` nodes. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
This enables us to import _ast from _functions, without it being a circular dependency. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Most of these are provided by the rust component graph now. The `find_first_descendant_component` function needs to be replaced by the coalesce formulas of the component graph, but not done yet. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
|
No release notes because this is an improvement to an unreleased feature, which will stop being a new feature, after its api is changed to be compatible with the old one. |
llucax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general, just a few minor comments.
| io_pairs: list[tuple[list[float | None], float | None]], | ||
| ) -> None: | ||
| """Run a formula test.""" | ||
| _logger.debug("TESTING FORMULA: %s", formula_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll get this for free when using parametrize 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements lazy subscription for the Coalesce function in the formula engine. The key change is that Coalesce now subscribes to its parameters progressively: it starts by subscribing only to the first parameter, and if that returns None, it subscribes to the next parameter, and so on. This optimizes performance by avoiding unnecessary subscriptions when earlier parameters provide non-None values.
Key changes:
- Converted formula evaluation from synchronous to asynchronous throughout the codebase
- Implemented lazy subscription mechanism for
Coalescefunction - Refactored AST nodes to support async evaluation and subscription management
- Changed
ResampledStreamFetcher.fetch_stream()to be async and subscribe immediately rather than batching subscriptions
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
tests/timeseries/_formulas/utils.py |
Made get_resampled_stream async to match new async API |
tests/timeseries/_formulas/test_formulas_3_phase.py |
Updated tests for async behavior, added event loop policy fixture |
tests/timeseries/_formulas/test_formulas.py |
Enhanced tests with component IDs list, added debug logging, updated for async |
tests/timeseries/_formulas/test_formula_composition.py |
Made test helper async, added extra receives for lazy subscription behavior |
tests/microgrid/test_grid.py |
Made get_resampled_stream calls async |
src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py |
Updated documentation for formula syntax |
src/frequenz/sdk/timeseries/formulas/_token.py |
Added docstring for Component.id field |
src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py |
Made fetch_stream async, removed batch subscription logic |
src/frequenz/sdk/timeseries/formulas/_parser.py |
Updated to create async metric fetchers, changed constant creation |
src/frequenz/sdk/timeseries/formulas/_functions.py |
Implemented lazy subscription in Coalesce, made all functions async |
src/frequenz/sdk/timeseries/formulas/_formula_evaluator.py |
Simplified evaluator to use async AST node evaluation |
src/frequenz/sdk/timeseries/formulas/_formula_3_phase_evaluator.py |
Updated for async evaluation with synchronizer |
src/frequenz/sdk/timeseries/formulas/_formula.py |
Added metric_fetcher helper, updated FormulaBuilder for async |
src/frequenz/sdk/timeseries/formulas/_base_ast_node.py |
New file with AstNode base class and NodeSynchronizer |
src/frequenz/sdk/timeseries/formulas/_ast.py |
Complete rewrite to support async evaluation and subscriptions |
src/frequenz/sdk/_internal/_graph_traversal.py |
Removed unused helper functions |
pyproject.toml |
Removed unused types-networkx dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/frequenz/sdk/timeseries/formulas/_formula_3_phase_evaluator.py
Outdated
Show resolved
Hide resolved
|
Oh, I forgot about a few minor comments on commit messages:
|
multiply and divide are not needed for the automatically generated formulas. I can't think of a usecase where people would need to divide by a quantity. So I think we don't need to worry about it yet. |
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
|
I've put the review comment changes in a single commit. Don't want to change the commit ID, because I've pasted them in the previous PR, don't want to invalidate them. Will remember to not make lower case commit messages next time. 🤞🏽 |
Yeah, no problem, I agree at some point we need to relax as rebasing within big PRs it very costly. Balancing is good. |
llucax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
No description provided.