Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions src/spellbind/observable_sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from abc import ABC, abstractmethod
from functools import cached_property
from typing import Sequence, Generic, MutableSequence, Iterable, overload, SupportsIndex, Callable, Iterator, \
TypeVar, Any
TypeVar, Any, Hashable

from typing_extensions import TypeIs, Self, override

Expand All @@ -16,7 +16,7 @@
clear_action, DeltaAction, SimpleRemoveOneAction, SimpleAddOneAction, ElementsChangedAction, \
SimpleOneElementChangedAction
from spellbind.event import ValueEvent
from spellbind.int_values import IntVariable, IntValue
from spellbind.int_values import IntVariable, IntValue, IntConstant
from spellbind.observable_collections import ObservableCollection, ValueCollection
from spellbind.observables import ValueObservable, ValuesObservable, void_value_observable, void_values_observable, \
combine_values_observables, combine_value_observables
Expand All @@ -25,6 +25,7 @@
_S = TypeVar("_S")
_S_co = TypeVar("_S_co", covariant=True)
_T = TypeVar("_T")
_H = TypeVar("_H", bound=Hashable)


class ObservableSequence(Sequence[_S_co], ObservableCollection[_S_co], Generic[_S_co], ABC):
Expand Down Expand Up @@ -973,6 +974,76 @@ def __str__(self) -> str:
return "[]"


class StaticObservableSequence(IndexObservableSequence[_S], Generic[_S]):
_on_change: ValueObservable[AtIndicesDeltasAction[_S] | ClearAction[_S] | ReverseAction[_S]]
_delta_observable: ValuesObservable[AtIndexDeltaAction[_S]]

def __init__(self, iterable: Iterable[_S] = ()) -> None:
self._sequence = tuple(iterable)
self._on_change = void_value_observable()
self._delta_observable = void_values_observable()
self._length_value = IntConstant.of(len(self._sequence))

@property
@override
def on_change(self) -> ValueObservable[AtIndicesDeltasAction[_S] | ClearAction[_S] | ReverseAction[_S]]:
return self._on_change

@property
@override
def delta_observable(self) -> ValuesObservable[AtIndexDeltaAction[_S]]:
return self._delta_observable

@property
@override
def length_value(self) -> IntValue:
return self._length_value

@overload
@override
def __getitem__(self, index: int) -> _S: ...

@overload
@override
def __getitem__(self, index: slice) -> Sequence[_S]: ...

@override
def __getitem__(self, index: int | slice) -> _S | Sequence[_S]:
return self._sequence[index]

@override
def __iter__(self) -> Iterator[_S]:
return iter(self._sequence)

@override
def __len__(self) -> int:
return len(self._sequence)

@override
def __str__(self) -> str:
return "[" + ", ".join(repr(item) for item in self._sequence) + "]"

@override
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._sequence!r})"

@override
def __eq__(self, other: object) -> bool:
if isinstance(other, FrozenObservableSequence):
return self._sequence == other._sequence
return super().__eq__(other)


class FrozenObservableSequence(StaticObservableSequence[_H], Generic[_H]):
def __init__(self, iterable: Iterable[_H] = ()) -> None:
super().__init__(iterable)
self._hash = hash(self._sequence) # ensure fast-fail for non hashable elements, like frozenset does

@override
def __hash__(self) -> int:
return self._hash


EMPTY_SEQUENCE: IndexObservableSequence[Any] = _EmptyObservableSequence()


Expand Down
109 changes: 109 additions & 0 deletions tests/test_collections/test_frozen_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import pytest

from spellbind.observable_sequences import FrozenObservableSequence, ObservableList, StaticObservableSequence


def test_initialize_frozen_sequence_empty_str():
sequence = StaticObservableSequence()
assert str(sequence) == "[]"


def test_initialize_frozen_sequence_empty_is_empty():
sequence = StaticObservableSequence()
assert len(sequence) == 0
assert sequence.length_value.value == 0
assert sequence.length_value.constant_value_or_raise == 0
assert list(sequence) == []


def test_initialize_frozen_sequence_empty_observers_not_observed():
sequence = StaticObservableSequence()
assert not sequence.on_change.is_observed()
assert not sequence.delta_observable.is_observed()

sequence.on_change.observe(lambda _: None)
assert not sequence.on_change.is_observed()

sequence.delta_observable.observe(lambda _: None)
assert not sequence.delta_observable.is_observed()


def test_static_sequence_str():
sequence = StaticObservableSequence([1, 2, 3])
assert str(sequence) == "[1, 2, 3]"


def test_static_sequence_length():
sequence = StaticObservableSequence([1, 2, 3])
assert len(sequence) == 3
assert sequence.length_value.value == 3
assert sequence.length_value.constant_value_or_raise == 3


def test_static_sequence_get_item():
sequence = StaticObservableSequence([1, 2, 3])
assert sequence[0] == 1
assert sequence[1] == 2
assert sequence[2] == 3


def test_static_sequence_iter():
sequence = StaticObservableSequence([1, 2, 3])
assert list(sequence) == [1, 2, 3]


def test_static_sequence_contains():
sequence = StaticObservableSequence([1, 2, 3])
assert 1 in sequence
assert 2 in sequence
assert 3 in sequence
assert 4 not in sequence
assert "test" not in sequence


def test_static_sequence_has_no_append():
sequence = StaticObservableSequence([1, 2, 3])
with pytest.raises(AttributeError):
sequence.append(4)


def test_static_sequence_has_no_remove():
sequence = StaticObservableSequence([1, 2, 3])
with pytest.raises(AttributeError):
sequence.remove(2)


def test_static_sequence_equals_true():
seq1 = StaticObservableSequence([1, 2, 3])
seq2 = StaticObservableSequence([1, 2, 3])
assert seq1 == seq2


def test_static_sequence_equals_false():
seq1 = StaticObservableSequence([1, 2, 3])
seq2 = StaticObservableSequence([1, 2, 4])
assert seq1 != seq2


def test_static_sequence_equals_observable_list_true():
seq1 = StaticObservableSequence([1, 2, 3])
seq2 = ObservableList([1, 2, 3])
assert seq1 == seq2


def test_static_sequence_equals_observable_list_false():
seq1 = StaticObservableSequence([1, 2, 3])
seq2 = ObservableList([1, 2, 4])
assert seq1 != seq2


def test_frozen_sequence_hash_equal():
seq1 = FrozenObservableSequence([1, 2, 3])
seq2 = FrozenObservableSequence([1, 2, 3])
assert hash(seq1) == hash(seq2)


def test_frozen_sequence_hash_not_equal():
seq1 = FrozenObservableSequence([1, 2, 3])
seq2 = FrozenObservableSequence([1, 2, 4])
assert hash(seq1) != hash(seq2)