Skip to content
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ dependencies = [
"bidict>=0.23.1",
"geff>=1.1.3.1.1",
"psygnal>=0.14.0",
# zarr 2.x's util.py imports cbuffer_sizes/cbuffer_metainfo from
# numcodecs.blosc, which numcodecs >= 0.16 removed. On Python 3.10 the
# resolver is forced to numcodecs <= 0.13 (numcodecs >= 0.14 needs
# Python >= 3.11), so zarr 2.18 still imports there. On Python >= 3.11
# nothing prevents the broken pair, so require zarr >= 3 explicitly.
"zarr>=3; python_version >= '3.11'",
]

[project.optional-dependencies]
Expand Down
57 changes: 40 additions & 17 deletions src/tracksdata/graph/_graph_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,18 @@ def remove_node(self, node_id: int) -> None:
if node_id not in self._external_to_local:
raise ValueError(f"Node {node_id} does not exist in the graph.")

if is_signal_on(self.node_removed):
# Capture signal state once so a slot connecting mid-call cannot reference
# an unbound `old_attrs`.
view_signal_on = is_signal_on(self.node_removed)
root_signal_on = is_signal_on(self._root.node_removed)
if view_signal_on or root_signal_on:
old_attrs = self.nodes[node_id].to_dict()

# Remove from root graph first, because removing bounding box requires node attrs
self._root.remove_node(node_id)
# Remove from root graph first, because removing bounding box requires node attrs.
# Block root's signal so it doesn't fire while the view is still in old state;
# re-emit at the end after both root and view are consistent.
with self._root.node_removed.blocked():
self._root.remove_node(node_id)

if self.sync:
# Get the local node ID and remove from local graph
Expand All @@ -478,7 +485,9 @@ def remove_node(self, node_id: int) -> None:
else:
self._out_of_sync = True

if is_signal_on(self.node_removed):
if root_signal_on:
self._root.node_removed.emit(node_id, old_attrs)
if view_signal_on:
self.node_removed.emit(node_id, old_attrs)

def add_edge(
Expand Down Expand Up @@ -685,8 +694,11 @@ def update_node_attrs(
else:
node_ids = list(node_ids)

signal_on = is_signal_on(self.node_updated)
if signal_on:
# Capture signal state once so slots connecting mid-call cannot toggle behavior
# between the old/new attr captures or between the two emit blocks.
view_signal_on = is_signal_on(self.node_updated)
root_signal_on = is_signal_on(self._root.node_updated)
if view_signal_on or root_signal_on:
existing_keys = set(self._root.node_attr_keys(return_ids=True))
signal_keys = list(
dict.fromkeys(
Expand All @@ -709,10 +721,13 @@ def update_node_attrs(
.rows_by_key(key=DEFAULT_ATTR_KEYS.NODE_ID, named=True, unique=True, include_key=True)
)

self._root.update_node_attrs(
node_ids=node_ids,
attrs=attrs,
)
# Block root signal so it doesn't fire while the view is still in old state;
# re-emit at the end after both root and view are consistent.
with self._root.node_updated.blocked():
self._root.update_node_attrs(
node_ids=node_ids,
attrs=attrs,
)
# because attributes are passed by reference, we need don't need if both are rustworkx graphs
if not self._is_root_rx_graph:
if self.sync:
Expand All @@ -724,19 +739,27 @@ def update_node_attrs(
else:
self._out_of_sync = True

if signal_on:
if view_signal_on or root_signal_on:
new_attrs_by_id = (
self._root.filter(node_ids=node_ids)
.node_attrs(attr_keys=signal_keys)
.rows_by_key(key=DEFAULT_ATTR_KEYS.NODE_ID, named=True, unique=True, include_key=True)
)
old_attrs_by_id = cast(dict[int, dict[str, Any]], old_attrs_by_id) # for mypy
for node_id in node_ids:
self.node_updated.emit(
node_id,
old_attrs_by_id[node_id],
new_attrs_by_id[node_id],
)
if root_signal_on:
for node_id in node_ids:
self._root.node_updated.emit(
node_id,
old_attrs_by_id[node_id],
new_attrs_by_id[node_id],
)
if view_signal_on:
for node_id in node_ids:
self.node_updated.emit(
node_id,
old_attrs_by_id[node_id],
new_attrs_by_id[node_id],
)

def update_edge_attrs(
self,
Expand Down
84 changes: 84 additions & 0 deletions src/tracksdata/graph/_test/test_graph_view_signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Tests for GraphView signal-emission consistency.

When a node-mutation signal fires through a GraphView, listeners attached to
either the root or the view must see the two graphs in a consistent state.
A listener attached to root that queries the view (or vice versa) must not
observe ghost or stale nodes.
"""

import polars as pl

from tracksdata.constants import DEFAULT_ATTR_KEYS
from tracksdata.graph import BaseGraph


def test_view_node_signals_fire_with_consistent_state(graph_backend: BaseGraph) -> None:
"""add_node / remove_node: when either signal fires (on root or view), the
two graphs must agree on `has_node`.

Today used to fail on `remove_node` because `GraphView.remove_node` does not
block the root signal — root emits while the view's local rx_graph still
holds the node.
"""
graph_backend.add_node_attr_key("x", pl.Float64)
graph_backend.add_node({"t": 0, "x": 0.0})

view = graph_backend.filter().subgraph()
observations: list = []

def make_slot(source: str, signal: str):
def slot(node_id: int, *_args) -> None:
observations.append((source, signal, node_id, graph_backend.has_node(node_id), view.has_node(node_id)))

return slot

graph_backend.node_added.connect(make_slot("root", "added"))
graph_backend.node_removed.connect(make_slot("root", "removed"))
view.node_added.connect(make_slot("view", "added"))
view.node_removed.connect(make_slot("view", "removed"))

new_id = view.add_node({"t": 1, "x": 1.0})
view.remove_node(new_id)

inconsistent = [obs for obs in observations if obs[3] != obs[4]]
detail = "\n".join(
f" {source}.{signal}(node={nid}): root.has_node={rh}, view.has_node={vh}"
for source, signal, nid, rh, vh in inconsistent
)
assert not inconsistent, f"Listener saw root and view in inconsistent state at signal time:\n{detail}"


def test_view_update_node_attrs_signal_fires_with_consistent_value(graph_backend: BaseGraph) -> None:
"""update_node_attrs: when either signal fires, root and view must hold
the same value for the updated attribute.

This used to fail on backends where root and view do not share an attribute
storage (SQLGraph): root emits with the new value while the view's local
rx_graph still holds the old one.
"""
graph_backend.add_node_attr_key("x", pl.Float64)
node_id = graph_backend.add_node({"t": 0, "x": 0.0})

view = graph_backend.filter().subgraph()
observations: list = []

def attr_value(graph: BaseGraph, nid: int) -> float:
df = graph.node_attrs(attr_keys=[DEFAULT_ATTR_KEYS.NODE_ID, "x"])
return df.filter(pl.col(DEFAULT_ATTR_KEYS.NODE_ID) == nid)["x"].item()

def make_slot(source: str):
def slot(nid: int, _old: dict, _new: dict) -> None:
observations.append((source, nid, attr_value(graph_backend, nid), attr_value(view, nid)))

return slot

graph_backend.node_updated.connect(make_slot("root"))
view.node_updated.connect(make_slot("view"))

view.update_node_attrs(attrs={"x": 5.0}, node_ids=[node_id])

inconsistent = [obs for obs in observations if obs[2] != obs[3]]
detail = "\n".join(
f" {source}.node_updated(node={nid}): root.x={rx}, view.x={vx}" for source, nid, rx, vx in inconsistent
)
assert not inconsistent, f"Listener saw root and view holding different attribute values at signal time:\n{detail}"
Loading