Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/app_model/expressions/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Context(ChainMap):
"""Evented Mapping of keys to values."""

changed = Signal(set) # Set[str]
"""Event emitted (with a set of changed keys) whenever the context is changed."""

def __init__(self, *maps: MutableMapping) -> None:
super().__init__(*maps)
Expand Down Expand Up @@ -63,6 +64,56 @@ def new_child(self, m: MutableMapping | None = None) -> Context:
def __hash__(self) -> int:
return id(self)

# ChainMap overrides update/clear/pop/popitem/__ior__ to operate on
# `self.maps[0]` directly, bypassing our `__setitem__`/`__delitem__`
# (and therefore signal emission). We re-override them here to route
# through the evented dunder methods, and aggregate per-key signals
# into a single `changed` emission via `buffered_changes`.
# Hidden from type checkers so callers see the stricter inherited
# signatures.
if not TYPE_CHECKING:

def update(self, *args: Any, **kwargs: Any) -> None:
with self.buffered_changes():
return super().update(*args, **kwargs)

def clear(self) -> None:
# super().clear() would call self.maps[0].clear(), skipping
# __delitem__ entirely.
with self.buffered_changes():
for k in list(self.maps[0]):
del self[k]

def pop(self, key: str, *args: Any) -> Any:
# super().pop() goes straight to self.maps[0].pop(...), so no
# signal would fire. Route through __delitem__ instead.
if key in self.maps[0]:
val = self.maps[0][key]
del self[key]
return val
if args:
return args[0]
raise KeyError(f"Key not found in the first mapping: {key!r}")

def popitem(self) -> tuple[str, Any]:
# super().popitem() also bypasses __delitem__. Pick the same
# entry CPython's dict.popitem would (the last-inserted key)
# and remove it via __delitem__ to fire a signal.
try:
key = next(reversed(self.maps[0]))
except StopIteration:
raise KeyError("No keys found in the first mapping.") from None
val = self.maps[0][key]
del self[key]
return key, val

def __ior__(self, other: Any) -> Context:
# ChainMap.__ior__ calls self.maps[0].update(other), bypassing
# both __setitem__ and our update() override. Defer to our
# update() so the buffered single-signal behavior applies.
self.update(other)
return self


# note: it seems like WeakKeyDictionary would be a nice match here, but
# it appears that the object somehow isn't initialized "enough" to register
Expand Down
180 changes: 180 additions & 0 deletions tests/test_context/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,183 @@ def test_context_events() -> None:
mock4.reset_mock()
root3e["e"] = 1
assert mock4.call_args[0][0] == {"e"}


@pytest.fixture
def ctx_and_mock() -> tuple[Context, Mock]:
ctx = Context()
mock = Mock()
ctx.changed.connect(mock)
return ctx, mock


def test_update_positional_dict(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
ctx.update({"a": 1, "b": 2})
assert dict(ctx) == {"a": 1, "b": 2}
mock.assert_called_once_with({"a", "b"})


def test_update_kwargs(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
ctx.update(a=1, b=2)
assert dict(ctx) == {"a": 1, "b": 2}
mock.assert_called_once_with({"a", "b"})


def test_update_iterable_of_pairs(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
ctx.update([("a", 1), ("b", 2)])
assert dict(ctx) == {"a": 1, "b": 2}
mock.assert_called_once_with({"a", "b"})


def test_update_dict_and_kwargs(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
ctx.update({"a": 1}, b=2, c=3)
assert dict(ctx) == {"a": 1, "b": 2, "c": 3}
mock.assert_called_once_with({"a", "b", "c"})


def test_clear_emits_one_signal_with_all_keys(
ctx_and_mock: tuple[Context, Mock],
) -> None:
ctx, mock = ctx_and_mock
ctx.update({"a": 1, "b": 2})
mock.assert_called_once_with({"a", "b"})
mock.reset_mock()

ctx.clear()
assert dict(ctx) == {}
mock.assert_called_once_with({"a", "b"})


def test_clear_empty_emits_nothing(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
ctx.clear()
mock.assert_not_called()


def test_clear_only_clears_first_map() -> None:
"""ChainMap.clear semantics: only maps[0] is cleared; parents untouched."""
root = Context()
root["from_parent"] = "p"
child = root.new_child()
child["own"] = "c"

mock = Mock()
child.changed.connect(mock)
child.clear()

assert "own" not in child.maps[0]
# parent keys are still visible through the chain
assert child["from_parent"] == "p"
assert root["from_parent"] == "p"
mock.assert_called_once_with({"own"})


def test_pop_existing_key() -> None:
ctx = Context()
ctx["a"] = 1
mock = Mock()
ctx.changed.connect(mock)

assert ctx.pop("a") == 1
assert "a" not in ctx
mock.assert_called_once_with({"a"})


def test_pop_missing_with_default(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
assert ctx.pop("missing", "default") == "default"
mock.assert_not_called()


def test_pop_missing_without_default_raises(
ctx_and_mock: tuple[Context, Mock],
) -> None:
ctx, mock = ctx_and_mock
with pytest.raises(KeyError, match="Key not found in the first mapping"):
ctx.pop("missing")
mock.assert_not_called()


def test_pop_parent_only_key_raises() -> None:
"""ChainMap.pop only operates on maps[0]."""
root = Context()
root["only_in_parent"] = 1
child = root.new_child()

with pytest.raises(KeyError, match="Key not found in the first mapping"):
child.pop("only_in_parent")
assert root["only_in_parent"] == 1


def test_popitem() -> None:
ctx = Context()
ctx["a"] = 1
mock = Mock()
ctx.changed.connect(mock)

key, val = ctx.popitem()
assert (key, val) == ("a", 1)
assert "a" not in ctx
mock.assert_called_once_with({"a"})


def test_popitem_empty_raises(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
with pytest.raises(KeyError, match="No keys found in the first mapping"):
ctx.popitem()
mock.assert_not_called()


def test_setdefault_new_key(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
assert ctx.setdefault("a", 1) == 1
assert ctx["a"] == 1
mock.assert_called_once_with({"a"})


def test_setdefault_existing_key_no_signal() -> None:
ctx = Context()
ctx["a"] = 1
mock = Mock()
ctx.changed.connect(mock)

assert ctx.setdefault("a", 99) == 1
assert ctx["a"] == 1
mock.assert_not_called()


def test_ior_emits_one_signal(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, mock = ctx_and_mock
ctx |= {"a": 1, "b": 2}
assert dict(ctx) == {"a": 1, "b": 2}
mock.assert_called_once_with({"a", "b"})


def test_ior_preserves_identity(ctx_and_mock: tuple[Context, Mock]) -> None:
ctx, _ = ctx_and_mock
before = id(ctx)
ctx |= {"a": 1}
assert id(ctx) == before


def test_bulk_change_propagates_to_descendants() -> None:
"""Bulk changes on a parent should be seen as a single event by descendants."""
root = Context()
child = root.new_child()
grandchild = child.new_child()

mock = Mock()
grandchild.changed.connect(mock)

root.update({"x": 1, "y": 2})
mock.assert_called_once_with({"x", "y"})

mock.reset_mock()
root["x"] = 1 # no actual change in value
root["y"] = 2
root.clear()
mock.assert_called_once_with({"x", "y"})
Loading