Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pipeline/consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,50 @@

from __future__ import annotations

import re

from .prompts import build_consolidation_prompt
from .haiku import call_haiku
from .types import ConsolidationResult, TokenUsage


# A real memory entry header: "## HH:MM", "## Week of ...", or "## YYYY-MM-DD".
_ENTRY_HEADER = re.compile(r"(?m)^## (\d{2}:\d{2}|Week of |\d{4}-\d{2}-\d{2})")


class ConsolidationSkipped(Exception):
"""Raised when Haiku declined (SKIP) or returned non-conforming output.

Signals that the consolidation produced no usable result. The caller
MUST NOT overwrite recent.md/archive.md and MUST NOT retire (rename to
``.done.md``) the source staging files — they are left for the next run.
"""


def _is_valid_consolidation(text: str) -> bool:
"""Return True only if the model output is real consolidated memory.

The expected output is the ``===RECENT===`` / ``===ARCHIVE===`` envelope.
A response with neither the recent delimiter nor a single ``## `` entry
header is conversational text (a refusal, a clarifying question, or a
"here is what I would compress…" preamble) and must be rejected — writing
it would replace memory with chatter and the source files would be lost.

Args:
text: Raw text response from Haiku.

Returns:
True if the text carries the recent delimiter or at least one memory
entry header; False for empty or purely conversational responses.
"""
t = text.strip()
if not t:
return False
if "===RECENT===" in t:
return True
return _ENTRY_HEADER.search(t) is not None


def consolidate(
staging_contents: dict[str, str],
recent: str,
Expand All @@ -44,10 +83,22 @@ def consolidate(

Raises:
RuntimeError: If the Haiku call fails or times out.
ConsolidationSkipped: If the model declined (SKIP) or returned
non-conforming output. The caller must not write or retire files.
"""
prompt = build_consolidation_prompt(staging_contents, recent, archive)
result = call_haiku(prompt, timeout=180)

# Guard: never let a SKIP or a conversational reply overwrite memory.
# Without this, a non-conforming response falls through to the parser's
# fallback and chatter is written as recent.md (and the source staging
# files are then irreversibly renamed to .done.md by the shell).
if result.is_skip or not _is_valid_consolidation(result.text):
raise ConsolidationSkipped(
"Haiku returned no usable consolidation "
"(SKIP or missing ===RECENT===/entry headers)"
)

recent_new, archive_new = parse_consolidation_response(result.text)

return ConsolidationResult(
Expand Down
15 changes: 13 additions & 2 deletions pipeline/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def cmd_consolidate(staging_dir: str, recent_file: str, archive_file: str) -> No
import tempfile

from ._tz import today_str
from .consolidate import consolidate
from .consolidate import consolidate, ConsolidationSkipped

today = today_str()

Expand All @@ -241,7 +241,17 @@ def cmd_consolidate(staging_dir: str, recent_file: str, archive_file: str) -> No
with open(archive_file, encoding="utf-8") as f:
archive = f.read()

result = consolidate(staging_contents, recent, archive)
try:
result = consolidate(staging_contents, recent, archive)
except ConsolidationSkipped:
# Model declined (SKIP) or returned non-conforming output. Emit a
# skip status so the shell leaves recent.md/archive.md untouched and
# does NOT rename the source staging files to .done.md — they remain
# available for the next run. STAGING_COUNT is non-zero (we did find
# files) but the shell gates on CONSOLIDATION_STATUS.
print(f"STAGING_COUNT={len(staging_contents)}")
print("CONSOLIDATION_STATUS=skip")
return

# Write results to temp files
fd_r, recent_out = tempfile.mkstemp(prefix="remember-recent-", suffix=".md")
Expand All @@ -262,6 +272,7 @@ def cmd_consolidate(staging_dir: str, recent_file: str, archive_file: str) -> No
f.write(os.path.join(staging_dir, name).encode() + b"\x00")

print(f"STAGING_COUNT={len(staging_contents)}")
print("CONSOLIDATION_STATUS=ok")
print(f"RECENT_OUT={_shell_escape(recent_out)}")
print(f"ARCHIVE_OUT={_shell_escape(archive_out)}")
print(f"TK_IN={result.tokens.input}")
Expand Down
12 changes: 11 additions & 1 deletion scripts/run-consolidation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,23 @@ RESULT=$(cd "$PIPELINE_DIR" && $PYTHON -m pipeline.shell consolidate "$STAGING_D
exit 1
}

# eval sets: STAGING_COUNT, RECENT_OUT, ARCHIVE_OUT, TK_IN/OUT/CACHE/COST, STAGING_PATHS_FILE
# eval sets: STAGING_COUNT, CONSOLIDATION_STATUS, RECENT_OUT, ARCHIVE_OUT, TK_IN/OUT/CACHE/COST, STAGING_PATHS_FILE
safe_eval <<< "$RESULT"

if [ "${STAGING_COUNT:-0}" -eq 0 ]; then
log "consolidation" "no staging files"; exit 0
fi

# Skip guard: if the model declined or returned non-conforming output, the
# pipeline emits CONSOLIDATION_STATUS=skip and no RECENT_OUT/ARCHIVE_OUT.
# Do NOT overwrite memory and do NOT retire staging files — leave everything
# in place so the next run retries. (Default to ok for backward compatibility
# with any caller that does not emit the status.)
if [ "${CONSOLIDATION_STATUS:-ok}" != "ok" ]; then
log "consolidation" "skip: status=${CONSOLIDATION_STATUS} — memory + staging files left untouched"
exit 0
fi

# --- Write output ---
cp "$RECENT_OUT" "$RECENT_FILE"
cp "$ARCHIVE_OUT" "$ARCHIVE_FILE"
Expand Down
84 changes: 83 additions & 1 deletion tests/test_consolidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import sys
from unittest.mock import patch, MagicMock

import pytest

sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

from pipeline.consolidate import parse_consolidation_response, consolidate
from pipeline.consolidate import (
parse_consolidation_response,
consolidate,
_is_valid_consolidation,
ConsolidationSkipped,
)
from pipeline.types import HaikuResult, TokenUsage, ConsolidationResult


Expand Down Expand Up @@ -143,3 +150,78 @@ def test_parse_empty_sections_between_markers():
# Both sections strip to "" — headers are only added when content is non-empty
assert recent == ""
assert archive == ""


# --- Validation guard: reject conversational / SKIP responses (issue #89) ---

def test_is_valid_rejects_refusal_text():
assert not _is_valid_consolidation(
"I cannot complete this compression task. The input is incomplete:"
)


def test_is_valid_rejects_clarifying_question():
assert not _is_valid_consolidation(
"I don't see a specific task. What would you like help with?"
)


def test_is_valid_rejects_empty():
assert not _is_valid_consolidation(" \n ")


def test_is_valid_accepts_envelope():
assert _is_valid_consolidation("===RECENT===\n# Recent\n## 2026-06-01\nx")


def test_is_valid_accepts_bare_body_with_entries():
assert _is_valid_consolidation("## 2026-06-01\nDid the thing.")
assert _is_valid_consolidation("## 14:32 | main\nDid the thing.")
assert _is_valid_consolidation("# Archive\n## Week of 2026-06-01\nx")


def test_consolidate_skips_on_refusal():
"""A conversational refusal must raise ConsolidationSkipped, not be written."""
refusal = HaikuResult(
text="I cannot complete this compression task. The input is incomplete.",
tokens=TokenUsage(input=100, output=20, cache=0, cost_usd=0.0001),
)
with patch("pipeline.consolidate.call_haiku", return_value=refusal):
with pytest.raises(ConsolidationSkipped):
consolidate(
staging_contents={"today-2026-06-01.md": "Did things."},
recent="# Recent\n\nold",
archive="# Archive\n\nold",
)


def test_consolidate_skips_on_skip_flag():
"""An explicit SKIP response must raise ConsolidationSkipped."""
skip = HaikuResult(
text="SKIP",
tokens=TokenUsage(input=50, output=1, cache=0, cost_usd=0.0),
is_skip=True,
)
with patch("pipeline.consolidate.call_haiku", return_value=skip):
with pytest.raises(ConsolidationSkipped):
consolidate(
staging_contents={"today-2026-06-01.md": "x"},
recent="",
archive="",
)


def test_consolidate_accepts_valid_envelope():
"""A well-formed envelope still consolidates normally."""
ok = HaikuResult(
text="===RECENT===\n# Recent\n\n## 2026-06-01\nDid things.\n\n===ARCHIVE===\n# Archive\n\nOld.",
tokens=TokenUsage(input=100, output=50, cache=0, cost_usd=0.0001),
)
with patch("pipeline.consolidate.call_haiku", return_value=ok):
result = consolidate(
staging_contents={"today-2026-06-01.md": "Did things."},
recent="",
archive="",
)
assert "2026-06-01" in result.recent
assert "Old" in result.archive