Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/forge_loop/eventlog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

from forge_loop.eventlog.legacy_mirror import LegacyEventMirror, LegacyRunnerEventKind
from forge_loop.eventlog.models import EventEnvelope, EventId, EventKind, EventRef
from forge_loop.eventlog.projections import ProjectionCursor
from forge_loop.eventlog.projections import (
ProjectionCursor,
ProjectionReplayError,
replay_projection,
)
from forge_loop.eventlog.sqlite import SqliteEventLog
from forge_loop.eventlog.store import EventLog, InMemoryEventLog

Expand All @@ -22,5 +26,7 @@
"LegacyEventMirror",
"LegacyRunnerEventKind",
"ProjectionCursor",
"ProjectionReplayError",
"SqliteEventLog",
"replay_projection",
]
41 changes: 41 additions & 0 deletions src/forge_loop/eventlog/projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

from __future__ import annotations

from collections.abc import Iterable
from dataclasses import dataclass
from typing import Protocol

from forge_loop.eventlog.models import EventEnvelope


class ProjectionReplayError(RuntimeError):
"""Raised when replay or cursor advancement would make a projection unsafe."""


@dataclass(frozen=True)
class ProjectionCursor:
"""Where a projection last caught up to the durable event log."""
Expand All @@ -22,3 +27,39 @@ class Projection(Protocol):

def apply(self, event: EventEnvelope) -> None:
"""Apply one event. Implementations should be idempotent by sequence."""


class ProjectionEventLog(Protocol):
"""Event-log operations needed for deterministic projection replay."""

def since(self, sequence: int = 0) -> Iterable[EventEnvelope]:
"""Yield events with sequence greater than ``sequence`` in log order."""
...

def advance_projection_cursor(
self,
projection_name: str,
cursor: ProjectionCursor,
) -> None:
"""Persist a cursor only when it is monotonic and within the log tail."""
...


Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[sev2/tests] replay_projection is a new public function, but the tests only cover successful replay. Add an adversarial test where projection.apply() raises partway through replay and assert the stored projection cursor remains at its original sequence; that exercises the helper's promised failure behavior instead of only the lower-level cursor guard.

def replay_projection(
event_log: ProjectionEventLog,
projection_name: str,
projection: Projection,
) -> ProjectionCursor:
"""Replay events from a projection cursor and persist the replayed tail.

The cursor advances only after every yielded event is applied. A projection
that receives events out of order should raise before the stored cursor
moves, which keeps restart recovery from blessing a partial replay.
"""

cursor = projection.cursor
for event in event_log.since(cursor.sequence):
projection.apply(event)
cursor = ProjectionCursor(sequence=event.sequence)
event_log.advance_projection_cursor(projection_name, cursor)
return cursor
23 changes: 22 additions & 1 deletion src/forge_loop/eventlog/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Any

from forge_loop.eventlog.models import EventEnvelope, EventId, EventKind, EventRef
from forge_loop.eventlog.projections import ProjectionCursor
from forge_loop.eventlog.projections import ProjectionCursor, ProjectionReplayError

_SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
Expand Down Expand Up @@ -178,6 +178,27 @@ def set_projection_cursor(
(projection_name, cursor.sequence),
)

def advance_projection_cursor(
self,
projection_name: str,
cursor: ProjectionCursor,
) -> None:
"""Persist a cursor only when it advances monotonically within the log."""

current = self.get_projection_cursor(projection_name)
if cursor.sequence < current.sequence:
raise ProjectionReplayError(
f"stale projection cursor for {projection_name}: "
f"{cursor.sequence} < {current.sequence}"
)
latest = self.latest_sequence()
if cursor.sequence > latest:
raise ProjectionReplayError(
f"projection cursor for {projection_name} is past latest event sequence: "
f"{cursor.sequence} > {latest}"
)
self.set_projection_cursor(projection_name, cursor)

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
"""Return saved projection cursors by projection name."""

Expand Down
29 changes: 28 additions & 1 deletion src/forge_loop/eventlog/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import Any, Protocol

from forge_loop.eventlog.models import EventEnvelope, EventId, EventKind
from forge_loop.eventlog.projections import ProjectionCursor
from forge_loop.eventlog.projections import ProjectionCursor, ProjectionReplayError


class EventLog(Protocol):
Expand Down Expand Up @@ -51,6 +51,14 @@ def set_projection_cursor(
"""Persist a projection cursor."""
...

def advance_projection_cursor(
self,
projection_name: str,
cursor: ProjectionCursor,
) -> None:
"""Persist a cursor only when it is monotonic and within the log tail."""
...

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
"""Return all saved projection cursors by projection name."""
...
Expand Down Expand Up @@ -106,5 +114,24 @@ def set_projection_cursor(
) -> None:
self._projection_cursors[projection_name] = cursor

def advance_projection_cursor(
self,
projection_name: str,
cursor: ProjectionCursor,
) -> None:
current = self.get_projection_cursor(projection_name)
if cursor.sequence < current.sequence:
raise ProjectionReplayError(
f"stale projection cursor for {projection_name}: "
f"{cursor.sequence} < {current.sequence}"
)
latest = self.latest_sequence()
if cursor.sequence > latest:
raise ProjectionReplayError(
f"projection cursor for {projection_name} is past latest event sequence: "
f"{cursor.sequence} > {latest}"
)
self.set_projection_cursor(projection_name, cursor)

def list_projection_cursors(self) -> Mapping[str, ProjectionCursor]:
return dict(self._projection_cursors)
9 changes: 8 additions & 1 deletion tests/test_boot_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
MemoryProvenance,
SqliteMemoryStore,
)
from forge_loop.tasks import SqliteTaskSagaStore, TaskSaga, TaskState
from forge_loop.tasks import Compensation, SqliteTaskSagaStore, TaskSaga, TaskState


def _frontier() -> FrontierCursor:
Expand Down Expand Up @@ -133,6 +133,13 @@ def test_boot_context_accepts_no_active_tasks_after_reopen(tmp_path: Path) -> No
saga_id="saga-165-failed",
state=TaskState.FAILED,
issue=165,
compensations=(
Compensation(
kind="cleanup-worktree",
target="/tmp/wt-loop-165",
reason="failed terminal tasks must keep their cleanup record",
),
),
)
)

Expand Down
Loading
Loading