Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/forge_loop/_testing/memory_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Test fake for curated memory storage."""

from __future__ import annotations

from dataclasses import dataclass, field, replace

from forge_loop.memory.models import REJECTED_PATH_TAG, MemoryItem, MemoryKind


@dataclass
class FakeMemoryStore:
items: dict[str, MemoryItem] = field(default_factory=dict)

def put(self, item: MemoryItem) -> MemoryItem:
self.items[item.memory_id] = item
return item

def get(self, memory_id: str) -> MemoryItem | None:
return self.items.get(memory_id)

def list_active(self, *, kind: MemoryKind | None = None) -> tuple[MemoryItem, ...]:
return tuple(
item
for item in self.items.values()
if item.is_active and (kind is None or item.kind is kind)
)

def list_rejected_paths(self) -> tuple[MemoryItem, ...]:
return tuple(item for item in self.list_active() if REJECTED_PATH_TAG in item.tags)

def supersede(self, memory_id: str, *, by_memory_id: str) -> MemoryItem:
if by_memory_id not in self.items:
raise KeyError(by_memory_id)
item = self.items.get(memory_id)
if item is None:
raise KeyError(memory_id)
superseded = replace(item, superseded_by=by_memory_id)
self.items[memory_id] = superseded
return superseded
25 changes: 23 additions & 2 deletions src/forge_loop/control/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from forge_loop.control.boot import BootContext
from forge_loop.frontier import FrontierCursor, FrontierStore
from forge_loop.memory import SqliteMemoryStore
from forge_loop.worker_sessions import WorkerSessionStore, recoverable_sessions


Expand All @@ -24,7 +25,7 @@ def collect_control_plane_status(
runner_state_dir = state_dir or repo / "docs" / "ops"
event_log_path = forge_dir / "events.db"
frontier_path = forge_dir / "frontier.yaml"
memory_path = forge_dir / "memory.yaml"
memory_path = forge_dir / "memory.db"
tasks_path = runner_state_dir / "worker-sessions.db"

event_log, projections, last_sequence = _event_log_status(event_log_path)
Expand Down Expand Up @@ -133,7 +134,27 @@ def _unavailable_frontier(path: Path) -> dict[str, Any]:


def _memory_status(path: Path) -> tuple[dict[str, Any], tuple[str, ...]]:
return _unavailable_memory(path), ()
if not path.exists():
return _unavailable_memory(path), ()

try:
store = SqliteMemoryStore(path)
active = store.list_active()
rejected = store.list_rejected_paths()
except (OSError, sqlite3.Error, ValueError) as exc:
status = _unavailable_memory(path)
status["error"] = str(exc)
return status, ()

return (
{
"available": True,
"path": str(path),
"active_count": len(active),
"rejected_count": len(rejected),
},
tuple(item.memory_id for item in active),
)


def _unavailable_memory(path: Path) -> dict[str, Any]:
Expand Down
17 changes: 15 additions & 2 deletions src/forge_loop/memory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
"""Curated long-term project memory contracts."""

from forge_loop.memory.models import MemoryItem, MemoryKind, MemoryProvenance
from forge_loop.memory.models import (
REJECTED_PATH_TAG,
MemoryItem,
MemoryKind,
MemoryProvenance,
)
from forge_loop.memory.store import MemoryStore, SqliteMemoryStore

__all__ = ["MemoryItem", "MemoryKind", "MemoryProvenance"]
__all__ = [
"REJECTED_PATH_TAG",
"MemoryItem",
"MemoryKind",
"MemoryProvenance",
"MemoryStore",
"SqliteMemoryStore",
]
14 changes: 8 additions & 6 deletions src/forge_loop/memory/curator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from dataclasses import dataclass

from forge_loop.memory.models import MemoryItem
from forge_loop.memory.store import MemoryStore


@dataclass(frozen=True)
Expand All @@ -22,7 +23,10 @@ class PromotionCandidate:


class MemoryCurator:
"""Minimal curator interface for future durable memory backends."""
"""Memory promotion policy with an optional durable store."""

def __init__(self, store: MemoryStore | None = None) -> None:
self._store = store

def should_promote(self, candidate: PromotionCandidate) -> bool:
"""Return whether a candidate should become durable memory.
Expand All @@ -33,9 +37,7 @@ def should_promote(self, candidate: PromotionCandidate) -> bool:
return bool(candidate.reason_to_remember.strip())

def promote(self, item: MemoryItem) -> MemoryItem:
"""Persist ``item`` in a future backend.

The scaffolding implementation is a pass-through; durable storage lands
after the contracts settle.
"""
"""Persist ``item`` when a durable store is configured."""
if self._store is not None:
return self._store.put(item)
return item
12 changes: 12 additions & 0 deletions src/forge_loop/memory/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ class MemoryKind(StrEnum):
PROCEDURAL = "procedural"


REJECTED_PATH_TAG = "rejected-path"


@dataclass(frozen=True)
class MemoryProvenance:
"""Evidence and lifecycle metadata for a durable memory item."""

source_event: EventRef | None
authored_by: str
source_task_ref: str | None = None
confidence: float = 1.0
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
supersedes: tuple[str, ...] = ()
Expand All @@ -31,6 +35,14 @@ class MemoryProvenance:
def __post_init__(self) -> None:
if not 0 <= self.confidence <= 1:
raise ValueError("memory confidence must be between 0 and 1")
if not self.authored_by.strip():
raise ValueError("memory provenance authored_by must be non-empty")
if self.source_event is None and (
self.source_task_ref is None or not self.source_task_ref.strip()
):
raise ValueError(
"memory provenance must include a source event or source task reference"
)


@dataclass(frozen=True)
Expand Down
224 changes: 224 additions & 0 deletions src/forge_loop/memory/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""Durable store for curated project memory."""

from __future__ import annotations

import json
import sqlite3
from collections.abc import Iterable
from datetime import datetime
from pathlib import Path
from typing import Protocol

from forge_loop.eventlog.models import EventId, EventRef
from forge_loop.memory.models import (
REJECTED_PATH_TAG,
MemoryItem,
MemoryKind,
MemoryProvenance,
)

_SCHEMA = """
CREATE TABLE IF NOT EXISTS memory_items (
memory_id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
title TEXT NOT NULL,
body TEXT NOT NULL,
tags_json TEXT NOT NULL,
source_event_id TEXT,
source_sequence INTEGER,
source_task_ref TEXT,
authored_by TEXT NOT NULL,
confidence REAL NOT NULL,
created_at TEXT NOT NULL,
supersedes_json TEXT NOT NULL,
evidence_refs_json TEXT NOT NULL,
superseded_by TEXT
);
"""


class MemoryStore(Protocol):
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[sev2/product] The PR exports a new MemoryStore protocol and adds a FakeMemoryStore contract surface, but the repo has no downstream consumer wired to the store or protocol. Under the no-scaffold-theatre rule, wire this into the maestro/boot memory-loading path in this PR, or quarantine/remove the plug-in surface behind an experimental extra until it has an in-repo consumer.

"""Persistence boundary for curated memory."""

def put(self, item: MemoryItem) -> MemoryItem:
"""Persist ``item`` and return the stored shape."""
...

def get(self, memory_id: str) -> MemoryItem | None:
"""Return one memory item, including superseded items."""
...

def list_active(self, *, kind: MemoryKind | None = None) -> tuple[MemoryItem, ...]:
"""Return non-superseded memory items in insertion order."""
...

def list_rejected_paths(self) -> tuple[MemoryItem, ...]:
"""Return active memory items tagged as rejected paths."""
...

def supersede(self, memory_id: str, *, by_memory_id: str) -> MemoryItem:
"""Mark ``memory_id`` as superseded by ``by_memory_id``."""
...


class SqliteMemoryStore:
"""SQLite-backed durable memory store."""

def __init__(self, path: str | Path) -> None:
self.path = Path(path)
connect_path: str | Path = ":memory:" if str(path) == ":memory:" else self.path
if str(path) != ":memory:":
self.path.parent.mkdir(parents=True, exist_ok=True)
self._connection = sqlite3.connect(connect_path)
self._connection.row_factory = sqlite3.Row
if str(path) != ":memory:":
self._connection.execute("PRAGMA journal_mode=WAL")
self._connection.executescript(_SCHEMA)

def put(self, item: MemoryItem) -> MemoryItem:
source_event_id = None
source_sequence = None
if item.provenance.source_event is not None:
source_event_id = str(item.provenance.source_event.event_id)
source_sequence = item.provenance.source_event.sequence

with self._connection:
self._connection.execute(
"""
INSERT INTO memory_items (
memory_id,
kind,
title,
body,
tags_json,
source_event_id,
source_sequence,
source_task_ref,
authored_by,
confidence,
created_at,
supersedes_json,
evidence_refs_json,
superseded_by
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(memory_id)
DO UPDATE SET
kind = excluded.kind,
title = excluded.title,
body = excluded.body,
tags_json = excluded.tags_json,
source_event_id = excluded.source_event_id,
source_sequence = excluded.source_sequence,
source_task_ref = excluded.source_task_ref,
authored_by = excluded.authored_by,
confidence = excluded.confidence,
created_at = excluded.created_at,
supersedes_json = excluded.supersedes_json,
evidence_refs_json = excluded.evidence_refs_json,
superseded_by = excluded.superseded_by
""",
(
item.memory_id,
item.kind.value,
item.title,
item.body,
_json_tuple(item.tags),
source_event_id,
source_sequence,
item.provenance.source_task_ref,
item.provenance.authored_by,
item.provenance.confidence,
item.provenance.created_at.isoformat(),
_json_tuple(item.provenance.supersedes),
_json_tuple(item.provenance.evidence_refs),
item.superseded_by,
),
)
return item

def get(self, memory_id: str) -> MemoryItem | None:
row = self._connection.execute(
"""
SELECT *
FROM memory_items
WHERE memory_id = ?
""",
(memory_id,),
).fetchone()
if row is None:
return None
return _item_from_row(row)

def list_active(self, *, kind: MemoryKind | None = None) -> tuple[MemoryItem, ...]:
params: tuple[str, ...]
where = "WHERE superseded_by IS NULL"
if kind is None:
params = ()
else:
where += " AND kind = ?"
params = (kind.value,)
return tuple(self._select(f"SELECT * FROM memory_items {where} ORDER BY rowid ASC", params))

def list_rejected_paths(self) -> tuple[MemoryItem, ...]:
return tuple(item for item in self.list_active() if REJECTED_PATH_TAG in item.tags)

def supersede(self, memory_id: str, *, by_memory_id: str) -> MemoryItem:
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[sev2/tests] SqliteMemoryStore.supersede adds a public failure path for missing memory IDs, but the new tests only cover successful supersession. Add an adversarial test that asserts supersede('missing', by_memory_id=...) raises KeyError for the real store and fake, so this public error contract is locked down.

if self.get(by_memory_id) is None:
raise KeyError(by_memory_id)
with self._connection:
cursor = self._connection.execute(
"""
UPDATE memory_items
SET superseded_by = ?
WHERE memory_id = ?
""",
(by_memory_id, memory_id),
)
if cursor.rowcount != 1:
raise KeyError(memory_id)
superseded = self.get(memory_id)
if superseded is None:
raise KeyError(memory_id)
return superseded

def _select(self, query: str, params: tuple[str, ...]) -> Iterable[MemoryItem]:
rows = self._connection.execute(query, params)
return (_item_from_row(row) for row in rows)


def _json_tuple(values: tuple[str, ...]) -> str:
return json.dumps(list(values), sort_keys=True, separators=(",", ":"))


def _load_tuple(raw: str, field_name: str) -> tuple[str, ...]:
values = json.loads(raw)
if not isinstance(values, list) or not all(isinstance(value, str) for value in values):
raise ValueError(f"memory field {field_name} must be a JSON list of strings")
return tuple(values)


def _item_from_row(row: sqlite3.Row) -> MemoryItem:
source_event = None
if row["source_event_id"] is not None and row["source_sequence"] is not None:
source_event = EventRef(
event_id=EventId(row["source_event_id"]),
sequence=row["source_sequence"],
)
return MemoryItem(
memory_id=row["memory_id"],
kind=MemoryKind(row["kind"]),
title=row["title"],
body=row["body"],
tags=_load_tuple(row["tags_json"], "tags"),
provenance=MemoryProvenance(
source_event=source_event,
authored_by=row["authored_by"],
source_task_ref=row["source_task_ref"],
confidence=row["confidence"],
created_at=datetime.fromisoformat(row["created_at"]),
supersedes=_load_tuple(row["supersedes_json"], "supersedes"),
evidence_refs=_load_tuple(row["evidence_refs_json"], "evidence_refs"),
),
superseded_by=row["superseded_by"],
)
Loading
Loading