Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/forge_loop/_testing/task_saga_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Test fake for task saga storage."""

from __future__ import annotations

from dataclasses import dataclass, field

from forge_loop.tasks.saga import TaskSaga


@dataclass
class FakeTaskSagaStore:
sagas: dict[str, TaskSaga] = field(default_factory=dict)

def put(self, saga: TaskSaga) -> TaskSaga:
self.sagas[saga.task_id] = saga
return saga

def get(self, task_id: str) -> TaskSaga | None:
return self.sagas.get(task_id)

def list_in_flight(self) -> tuple[TaskSaga, ...]:
return tuple(saga for saga in self.sagas.values() if not saga.is_terminal)
155 changes: 152 additions & 3 deletions src/forge_loop/control/boot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,77 @@

from __future__ import annotations

from dataclasses import dataclass
from collections.abc import Mapping
from dataclasses import dataclass, field
from pathlib import Path
from typing import Protocol

from forge_loop.eventlog import ProjectionCursor
from forge_loop.frontier import FrontierCursor
from forge_loop.memory import MemoryItem, MemoryKind
from forge_loop.tasks import TaskSaga


class BootContextError(RuntimeError):
"""Raised when required durable boot state cannot be loaded."""


class BootFrontierStore(Protocol):
"""Durable frontier source required for maestro boot."""

def load(self) -> FrontierCursor:
"""Load the current frontier cursor."""
...


class BootEventLog(Protocol):
"""Event-log reads needed during boot assembly."""

def latest_sequence(self) -> int:
"""Return the highest event sequence, or 0 when empty."""
...

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
"""Return saved projection cursors by projection name."""
...


class BootMemoryStore(Protocol):
"""Memory reads needed during boot assembly."""

def list_active(self, *, kind: MemoryKind | None = None) -> tuple[MemoryItem, ...]:
"""Return non-superseded memory items in insertion order."""
...

def list_rejected_paths(self) -> tuple[MemoryItem, ...]:
"""Return active memory items tagged as rejected paths."""
...


class BootTaskStore(Protocol):
"""Task saga reads needed during boot assembly."""

def list_in_flight(self) -> tuple[TaskSaga, ...]:
"""Return non-terminal task sagas in insertion order."""
...


@dataclass(frozen=True)
class ProjectionStatus:
"""Boot-time position of one durable projection."""

sequence: int
lag: int


@dataclass(frozen=True)
class BootSources:
"""Durable stores used to assemble reset recovery context."""

frontier_store: BootFrontierStore
event_log: BootEventLog
memory_store: BootMemoryStore | None = None
task_store: BootTaskStore | None = None


@dataclass(frozen=True)
Expand All @@ -13,15 +81,96 @@ class BootContext:

frontier: FrontierCursor
active_memory_ids: tuple[str, ...] = ()
rejected_path_memory_ids: tuple[str, ...] = ()
in_flight_task_ids: tuple[str, ...] = ()
in_flight_saga_ids: tuple[str, ...] = ()
latest_event_sequence: int = 0
last_event_sequence: int = 0
projection_cursors: Mapping[str, ProjectionStatus] = field(default_factory=dict)

def __post_init__(self) -> None:
sequence = self.latest_event_sequence or self.last_event_sequence
if (
self.latest_event_sequence
and self.last_event_sequence
and self.latest_event_sequence != self.last_event_sequence
):
raise ValueError("latest_event_sequence and last_event_sequence differ")
object.__setattr__(self, "latest_event_sequence", sequence)
object.__setattr__(self, "last_event_sequence", sequence)

def summary(self) -> str:
"""Human-readable reset context for logs, prompts, and status views."""
lines = [self.frontier.boot_summary()]
if self.active_memory_ids:
lines.append("memory: " + ", ".join(self.active_memory_ids))
if self.rejected_path_memory_ids:
lines.append("rejected_memory: " + ", ".join(self.rejected_path_memory_ids))
if self.in_flight_task_ids:
lines.append("in_flight: " + ", ".join(self.in_flight_task_ids))
lines.append(f"event_sequence: {self.last_event_sequence}")
if self.in_flight_saga_ids:
pairs = (
f"{task_id}/{saga_id}"
for task_id, saga_id in zip(
self.in_flight_task_ids,
self.in_flight_saga_ids,
strict=True,
)
)
lines.append("in_flight: " + ", ".join(pairs))
else:
lines.append("in_flight: " + ", ".join(self.in_flight_task_ids))
lines.append(f"event_sequence: {self.latest_event_sequence}")
if self.projection_cursors:
projections = [
f"{name}@{status.sequence} lag={status.lag}"
for name, status in sorted(self.projection_cursors.items())
]
lines.append("projections: " + ", ".join(projections))
return "\n".join(lines)


def assemble_boot_context(sources: BootSources) -> BootContext:
"""Assemble compact maestro reset context from durable stores."""

try:
frontier = sources.frontier_store.load()
except FileNotFoundError as exc:
path = getattr(sources.frontier_store, "path", None)
location = f" at {Path(path)}" if path is not None else ""
raise BootContextError(f"frontier state is required{location}") from exc
except ValueError as exc:
raise BootContextError(f"frontier state is invalid: {exc}") from exc

latest_event_sequence = sources.event_log.latest_sequence()
active_memory_ids: tuple[str, ...] = ()
rejected_path_memory_ids: tuple[str, ...] = ()
if sources.memory_store is not None:
active_memory_ids = tuple(item.memory_id for item in sources.memory_store.list_active())
rejected_path_memory_ids = tuple(
item.memory_id for item in sources.memory_store.list_rejected_paths()
)

in_flight_task_ids: tuple[str, ...] = ()
in_flight_saga_ids: tuple[str, ...] = ()
if sources.task_store is not None:
in_flight = sources.task_store.list_in_flight()
in_flight_task_ids = tuple(saga.task_id for saga in in_flight)
in_flight_saga_ids = tuple(saga.saga_id for saga in in_flight)

projection_cursors = {
name: ProjectionStatus(
sequence=cursor.sequence,
lag=max(latest_event_sequence - cursor.sequence, 0),
)
for name, cursor in sources.event_log.list_projection_cursors().items()
}

return BootContext(
frontier=frontier,
active_memory_ids=active_memory_ids,
rejected_path_memory_ids=rejected_path_memory_ids,
in_flight_task_ids=in_flight_task_ids,
in_flight_saga_ids=in_flight_saga_ids,
latest_event_sequence=latest_event_sequence,
projection_cursors=projection_cursors,
)
20 changes: 20 additions & 0 deletions src/forge_loop/eventlog/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ def since(self, sequence: int = 0) -> Iterable[EventEnvelope]:
)
return (self._envelope_from_row(row) for row in rows)

def latest_sequence(self) -> int:
"""Return the highest event sequence, or 0 when the log is empty."""

row = self._connection.execute("SELECT MAX(sequence) AS sequence FROM events").fetchone()
if row is None or row["sequence"] is None:
return 0
return int(row["sequence"])

def get_projection_cursor(self, projection_name: str) -> ProjectionCursor:
"""Return the saved projection cursor, or sequence 0 when absent."""

Expand Down Expand Up @@ -170,6 +178,18 @@ def set_projection_cursor(
(projection_name, cursor.sequence),
)

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
"""Return saved projection cursors by projection name."""

rows = self._connection.execute(
"""
SELECT projection_name, sequence
FROM projection_cursors
ORDER BY projection_name ASC
"""
)
return {row["projection_name"]: ProjectionCursor(sequence=row["sequence"]) for row in rows}

def _find_by_idempotency_key(self, idempotency_key: str) -> EventEnvelope | None:
row = self._connection.execute(
"""
Expand Down
42 changes: 42 additions & 0 deletions src/forge_loop/eventlog/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Protocol

from forge_loop.eventlog.models import EventEnvelope, EventId, EventKind
from forge_loop.eventlog.projections import ProjectionCursor


class EventLog(Protocol):
Expand All @@ -28,9 +29,31 @@ def append(
idempotency_key: str | None = None,
) -> EventEnvelope:
"""Append one event and return its durable envelope."""
...

def since(self, sequence: int = 0) -> Iterable[EventEnvelope]:
"""Yield events with sequence greater than ``sequence``."""
...

def latest_sequence(self) -> int:
"""Return the highest durable event sequence, or 0 when empty."""
...

def get_projection_cursor(self, projection_name: str) -> ProjectionCursor:
"""Return one projection cursor, or sequence 0 when absent."""
...

def set_projection_cursor(
self,
projection_name: str,
cursor: ProjectionCursor,
) -> None:
"""Persist a projection cursor."""
...

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
"""Return all saved projection cursors by projection name."""
...


@dataclass
Expand All @@ -42,6 +65,7 @@ class InMemoryEventLog:
"""

_events: list[EventEnvelope] = field(default_factory=list)
_projection_cursors: dict[str, ProjectionCursor] = field(default_factory=dict)

def append(
self,
Expand All @@ -66,3 +90,21 @@ def append(

def since(self, sequence: int = 0) -> Iterable[EventEnvelope]:
return (event for event in self._events if event.sequence > sequence)

def latest_sequence(self) -> int:
if not self._events:
return 0
return self._events[-1].sequence

def get_projection_cursor(self, projection_name: str) -> ProjectionCursor:
return self._projection_cursors.get(projection_name, ProjectionCursor(sequence=0))

def set_projection_cursor(
self,
projection_name: str,
cursor: ProjectionCursor,
) -> None:
self._projection_cursors[projection_name] = cursor

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
return dict(self._projection_cursors)
9 changes: 8 additions & 1 deletion src/forge_loop/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
"""Task saga lifecycle contracts."""

from forge_loop.tasks.saga import Compensation, TaskSaga, TaskState
from forge_loop.tasks.store import SqliteTaskSagaStore, TaskSagaStore

__all__ = ["Compensation", "TaskSaga", "TaskState"]
__all__ = [
"Compensation",
"SqliteTaskSagaStore",
"TaskSaga",
"TaskSagaStore",
"TaskState",
]
Loading
Loading