Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c1a4668
Use keyword-only arguments for AST node dataclasses
shsms Dec 2, 2025
a7dc25c
Extract Node base class to separate module
shsms Dec 3, 2025
5e320a9
Update functions to fetch their own arguments
shsms Dec 3, 2025
97510c7
Propagate Sample types through AST evaluation
shsms Dec 3, 2025
f888367
Make AST `evaluate` methods async
shsms Dec 3, 2025
2075ad3
Allow `TelemetryStream` AST nodes to subscribe to data on demand
shsms Dec 4, 2025
a37a75a
Return `Sample` after dividing QuantityT by `float`
shsms Dec 8, 2025
187cf0e
Add subscribe method to AST nodes for stream initialization
shsms Dec 9, 2025
720748b
Enable resend_latest on resampled stream channels
shsms Dec 9, 2025
c376dec
add NodeSynchronizer for coordinating AST node evaluation
shsms Dec 9, 2025
93e5f33
Use NodeSynchronizer for concurrent node evaluation
shsms Dec 9, 2025
95c61ec
Update formula tests to not expect channels to be created in order
shsms Dec 9, 2025
d272263
Add an unsubscribe method to AST nodes
shsms Dec 10, 2025
7c2cf3c
Handle missing timestamps gracefully in formula evaluator
shsms Dec 11, 2025
298df08
Update NodeSynchronizer to support syncing to the first node
shsms Dec 11, 2025
631e5d2
move FunCall class from _ast to _functions module
shsms Dec 11, 2025
a06702b
Implement lazy subscription for Coalesce function
shsms Dec 11, 2025
3131305
Remove commented-out code in formula evaluator
shsms Dec 11, 2025
04bc40b
Remove unused telemetry stream tracking in formula evaluator
shsms Dec 12, 2025
a08a925
remove unused graph traversal methods
shsms Dec 12, 2025
5b1a7cf
Remove unused types-networkx from dev-mypy dependencies
shsms Dec 12, 2025
1ce6bcf
Remove unnecessary pylint exception in ComponentGraph instantiation
shsms Dec 12, 2025
bb9423c
Improve formula syntax docs for `from_string` method
shsms Dec 12, 2025
71b3215
Make `metric_fetcher` a required argument for `TelemetryStream`
shsms Dec 12, 2025
1cf3d4b
Add docstrings for class attributes in AST nodes
shsms Dec 12, 2025
9b02348
Apply suggestions from review comments
shsms Dec 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ dev-mypy = [
"types-Markdown == 3.10.0.20251106",
"types-protobuf == 6.32.1.20251105",
"types-setuptools == 80.9.0.20250822",
"types-networkx == 3.6.0.20251127",
# For checking the noxfile, docs/ script, and tests
"frequenz-sdk[dev-mkdocs,dev-noxfile,dev-pytest]",
]
Expand Down
166 changes: 0 additions & 166 deletions src/frequenz/sdk/_internal/_graph_traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,182 +6,16 @@
from __future__ import annotations

from collections.abc import Iterable
from typing import Callable

from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.component import (
BatteryInverter,
Chp,
Component,
ComponentConnection,
EvCharger,
GridConnectionPoint,
SolarInverter,
)
from frequenz.microgrid_component_graph import ComponentGraph, InvalidGraphError


def is_pv_inverter(component: Component) -> bool:
"""Check if the component is a PV inverter.

Args:
component: The component to check.

Returns:
`True` if the component is a PV inverter, `False` otherwise.
"""
return isinstance(component, SolarInverter)


def is_battery_inverter(component: Component) -> bool:
"""Check if the component is a battery inverter.

Args:
component: The component to check.

Returns:
`True` if the component is a battery inverter, `False` otherwise.
"""
return isinstance(component, BatteryInverter)


def is_chp(component: Component) -> bool:
"""Check if the component is a CHP.

Args:
component: The component to check.

Returns:
`True` if the component is a CHP, `False` otherwise.
"""
return isinstance(component, Chp)


def is_ev_charger(component: Component) -> bool:
"""Check if the component is an EV charger.

Args:
component: The component to check.

Returns:
`True` if the component is an EV charger, `False` otherwise.
"""
return isinstance(component, EvCharger)


def is_battery_chain(
graph: ComponentGraph[Component, ComponentConnection, ComponentId],
component: Component,
) -> bool:
"""Check if the specified component is part of a battery chain.

A component is part of a battery chain if it is either a battery inverter or a
battery meter.

Args:
graph: The component graph.
component: component to check.

Returns:
Whether the specified component is part of a battery chain.
"""
return is_battery_inverter(component) or graph.is_battery_meter(component.id)


def is_pv_chain(
graph: ComponentGraph[Component, ComponentConnection, ComponentId],
component: Component,
) -> bool:
"""Check if the specified component is part of a PV chain.

A component is part of a PV chain if it is either a PV inverter or a PV
meter.

Args:
graph: The component graph.
component: component to check.

Returns:
Whether the specified component is part of a PV chain.
"""
return is_pv_inverter(component) or graph.is_pv_meter(component.id)


def is_ev_charger_chain(
graph: ComponentGraph[Component, ComponentConnection, ComponentId],
component: Component,
) -> bool:
"""Check if the specified component is part of an EV charger chain.

A component is part of an EV charger chain if it is either an EV charger or an
EV charger meter.

Args:
graph: The component graph.
component: component to check.

Returns:
Whether the specified component is part of an EV charger chain.
"""
return is_ev_charger(component) or graph.is_ev_charger_meter(component.id)


def is_chp_chain(
graph: ComponentGraph[Component, ComponentConnection, ComponentId],
component: Component,
) -> bool:
"""Check if the specified component is part of a CHP chain.

A component is part of a CHP chain if it is either a CHP or a CHP meter.

Args:
graph: The component graph.
component: component to check.

Returns:
Whether the specified component is part of a CHP chain.
"""
return is_chp(component) or graph.is_chp_meter(component.id)


def dfs(
graph: ComponentGraph[Component, ComponentConnection, ComponentId],
current_node: Component,
visited: set[Component],
condition: Callable[[Component], bool],
) -> set[Component]:
"""
Search for components that fulfill the condition in the Graph.

DFS is used for searching the graph. The graph traversal is stopped
once a component fulfills the condition.

Args:
graph: The component graph.
current_node: The current node to search from.
visited: The set of visited nodes.
condition: The condition function to check for.

Returns:
A set of component ids where the corresponding components fulfill
the condition function.
"""
if current_node in visited:
return set()

visited.add(current_node)

if condition(current_node):
return {current_node}

component: set[Component] = set()

for successor in graph.successors(current_node.id):
component.update(dfs(graph, successor, visited, condition))

return component


def find_first_descendant_component(
graph: ComponentGraph[Component, ComponentConnection, ComponentId],
*,
Expand Down
Loading