Skip to content

feat(subscribe): skip stale events on /subscribe initial backfill#8

Open
aaronbrethorst wants to merge 3 commits intomainfrom
no-old-messages
Open

feat(subscribe): skip stale events on /subscribe initial backfill#8
aaronbrethorst wants to merge 3 commits intomainfrom
no-old-messages

Conversation

@aaronbrethorst
Copy link
Copy Markdown
Member

@aaronbrethorst aaronbrethorst commented May 10, 2026

Summary

  • /subscribe/<source> initial backfill now skips events whose provider_timestamp is older than the source's effective skew window (the same effective_skew ingest already enforces — per-source skew_window from hooks.yaml, falling back to sources.DefaultSkewWindow = 5m). The cursor advances past skipped events so reconnects don't reconsider them and the unfiltered live drain doesn't re-emit them.
  • Live tail (notifier-triggered or keepalive-triggered drains) is unfiltered, so the inspector's "Replay to listeners" path still reaches currently-connected SSE subscribers; older events remain in the store and recoverable via the inspector or hooksctl replay.
  • Handler.Sources shape changes from map[string]bool to map[string]time.Duration (the per-source effective skew, resolved at the seam in internal/server.Build so the handler never sees zero).
  • Drives the change from a Standard Webhooks consumer that 401s on Message timestamp too old after a long disconnect: the relay durably stores webhooks, but consumers verify timestamps strictly, so automatic replay across the consumer's tolerance always failed.

Spec

docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md — all 10 TDD cases implemented in internal/subscribe/handler_test.go.

Test plan

  • go test -race ./... clean
  • make lint clean (golangci-lint v2: errcheck, govet, staticcheck, unused, gosec, errorlint, etc.)
  • All 10 spec TDD cases pass: TestInitialBackfillSkipsStaleEvent, TestInitialBackfillDeliversFreshEvent, TestInitialBackfillMixedBatchOnlyFreshEmittedAndIdempotentOnReconnect, TestInitialBackfillAllStaleStillAdvancesCursor, TestLiveTailDoesNotFilter, TestInitialBackfillBoundaryAtExactlySkew, TestInitialBackfillFutureTimestampPasses, TestInitialBackfillZeroProviderTimestampPasses, TestInitialBackfillSkipIsObservable, TestUnknownSourceIs404WithMapShape
  • Pre-existing subscribe tests remain green (replay-then-live, since=latest, Last-Event-ID override, keepalive-on-idle, release-on-disconnect, concurrent-subscribers, unknown-source-404)
  • Manual end-to-end against a live hooks binary: ingest a fresh signed webhook → SSE delivers it; ingest a stale signed webhook → ingest correctly rejects with 401 (defense in depth — the SSE filter is the second line, ingest is the first).

Documentation updated

  • README.md (top-line description + section 6 description of forward)
  • docs/quickstart.md (section 7 — forward behavior)
  • CLAUDE.md (internal/subscribe bullet — new policy + threshold source)
  • internal/subscribe/handler.go package doc
  • internal/sources/sources.go — promoted renderDefaultSkew from render.go to package-level DefaultSkewWindow so the seam in internal/server.Build references the same constant the verifier factories do.

Review notes (non-blocking, surfacing for visibility)

Multi-agent review found these items worth noting at review time:

  1. Zero-skew silent substitution. internal/server/server.go:96-99 falls back to sources.DefaultSkewWindow when src.SkewWindow == 0. The spec explicitly documents this as preserved pre-existing behavior ("Operator sets skew_window: '0s' expecting no enforcement, gets 5m default everywhere — pre-existing ingest behavior; this spec preserves it consistently rather than introducing a second convention"). If we want to distinguish "unset" from "explicit 0s" later, that's a separate change to internal/config/config.go (likely making the field a *time.Duration).

  2. keep=nil API on readBatchAndEmit. A future caller that filters via liveDrain would need to be careful not to silently drop events. The spec is explicit that live tail does not filter, so the single shared inner helper with a nil predicate is the cleanest expression of that today; renaming to skip (inverted predicate) is a reasonable follow-up if a second filtered call site appears.

  3. No keepalive during long initial drain. The keepalive ticker only starts after initialDrain returns. A subscriber draining a very large backlog of all-stale events would see no SSE traffic for the duration. Out of scope for this PR but worth a follow-up if anyone reports stuck connections after this lands.

  4. Skip log is debug-level and not a metric. Per spec ("Operator surprise — events present in the store but never replayed: Debug log on every skip; observable via --dev or HOOKS_LOG_LEVEL=debug"). If operational observability becomes a problem, a counter is the natural follow-up.

Summary by CodeRabbit

Release Notes

  • Enhancements

    • Initial webhook event synchronization now filters out events older than the source's signature-verification time window (default 5 minutes), while preserving the complete event record available for manual replay.
    • Live event streams continue to deliver all events in real-time without filtering.
  • Documentation

    • Updated guides to clarify event filtering during initial synchronization and manual replay options.

Review Change Stack

Design for making /subscribe/<source> drop events older than the
source's SkewWindow during the initial replay drain, while leaving
live tail and manual replay paths untouched. Aligns the auto-replay
behavior with Standard Webhooks consumer timestamp tolerances.
Resolves ambiguities surfaced in audit:
- Sources map semantics: presence-checked, value is the resolved
  effective_skew (zero meaning "use verifier default" handled in
  server.Build, not in the handler).
- Drain split into initialDrain (filters) + liveDrain (unfiltered) so
  the type system enforces the live-tail-not-filtered invariant.
- One-sided filter: future and zero-value provider_timestamps emit.
- Cursor-advance-on-skip elevated to a load-bearing invariant with
  its own TDD case (all-stale batch + live emit).
- Manual-replay-on-SSE caveat tightened (only reaches subscribers
  whose cursor < replayed.seq).
- /audit correctly described as operator-action volume; not part of
  the recovery surface.
- Logging types pinned (slog.Int64 for seq, slog.Duration for age and
  skew_window).
- Doc strings to update named explicitly (README.md:3, README.md:107,
  docs/quickstart.md:138, plus handler.go package doc and CLAUDE.md).
Standard Webhooks consumers reject events whose `webhook-timestamp` is
older than ~5 minutes. After a long disconnect, /subscribe replay would
re-deliver stale events that the consumer would 401-reject on
verification.

The initial-backfill drain now filters events older than the source's
effective skew window (per-source `skew_window` from hooks.yaml or
`sources.DefaultSkewWindow` when zero/unset, resolved at the seam in
`internal/server.Build`). The cursor advances past skipped events so
reconnects with `?since=<seq>` start past them and the live drain does
not re-emit them. Live tail (notifier-triggered or keepalive-triggered
drains) is unfiltered, so manual replay via the inspector still reaches
currently-connected subscribers.

The Handler.Sources field changes shape from map[string]bool to
map[string]time.Duration; subscribe.New takes the same map.

Spec: docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md
@sonarqubecloud
Copy link
Copy Markdown

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 10, 2026

📝 Walkthrough

Walkthrough

This PR implements a stale-event filter for SSE /subscribe initial backfill. The handler resolves per-source effective skew windows, filters events older than the skew cutoff during initial drain while advancing the cursor, and keeps the live tail unfiltered. The shared DefaultSkewWindow constant (5 minutes) is used across verification and SSE wiring to ensure consistent defaults.

Changes

SSE Subscribe Stale-Event Backfill Filter

Layer / File(s) Summary
Shared Skew Window Constant
internal/sources/sources.go
Added exported DefaultSkewWindow constant (5 minutes) as the verifier fallback for sources without explicit skew_window configuration.
Handler Type & Constructor Signature
internal/subscribe/handler.go
Handler.Sources changes from map[string]bool to map[string]time.Duration; added injected Now func() time.Time field; New signature updated to accept skew-duration map instead of source list.
Core Filtering & Drain Methods
internal/subscribe/handler.go
Implemented initialDrain method that filters stale events using per-source cutoff (now − skew), skips older events while advancing cursor, and liveDrain method for unfiltered event emission; refactored batch reading into readBatchAndEmit with optional keep-predicate.
Stream Startup & Live Loop Control
internal/subscribe/handler.go
Stream startup calls initialDrain first for bounded backfill, then loops between notifier-driven and keepalive-tick liveDrain calls for unfiltered live events.
Source Validation
internal/subscribe/handler.go
Source validation switched from boolean membership check to key presence in skew-duration map.
Server Build: Skew Window Resolution
internal/server/server.go
Computes subscribeSkews map by resolving each configured source's SkewWindow (defaulting zero to DefaultSkewWindow), passed to subscribe.New instead of raw source names.
Render Verifier Alignment
internal/sources/render.go
newRenderVerifier now defaults to shared DefaultSkewWindow instead of file-local constant when opts.SkewWindow is zero.
Test Harness & Utilities
internal/subscribe/handler_test.go
Test setup constructs Handler with map[string]time.Duration; added helpers: appendEventAt for timestamped events, readWithDeadline for SSE polling, connect for authenticated subscriptions, fixedNow for deterministic time.
Filtering Behavior Tests
internal/subscribe/handler_test.go
Tests validate stale skipping, fresh delivery, mixed-batch filtering with idempotent reconnects, cursor advancement when all backfill events are stale, and live tail remaining unfiltered.
Boundary & Edge Case Tests
internal/subscribe/handler_test.go
Tests cover exact-boundary events (emitted), future timestamps (pass), zero timestamps (forward-compat), unknown sources returning 404, and skip observability via structured logging.
Design Specification
docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md
Comprehensive spec detailing filtering behavior, timestamp field and resolution, architecture (Handler changes, drain methods, shared read loop), TDD plan (10 tests), unchanged paths, risks, and completion criteria.
User Documentation
CLAUDE.md, README.md, docs/quickstart.md
Clarified initial replay bounded by skew window (5 min for Render), older events skipped but stored/replayable, cursor advanced, bytes match original, and live tail unfiltered.

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.36% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main change: skipping stale events during the initial backfill phase of the /subscribe endpoint. It is specific, avoids generic language, and directly corresponds to the primary focus of the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch no-old-messages

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
internal/subscribe/handler.go (1)

236-259: ⚡ Quick win

Refactor readBatchAndEmit to clear the complexity gate.

Line 236 currently exceeds the configured cognitive complexity threshold by a small margin; extracting per-batch emission logic into a helper should resolve this without behavior changes.

♻️ Minimal extraction sketch
 func (h *Handler) readBatchAndEmit(ctx context.Context, w io.Writer, flusher http.Flusher, source string, cursor int64, batchLimit int, keep func(store.Event) bool) (int64, error) {
 	for {
 		batch, err := h.Store.ReadSince(ctx, source, cursor, batchLimit)
 		if err != nil {
 			return cursor, err
 		}
 		if len(batch) == 0 {
 			return cursor, nil
 		}
-		var wrote bool
-		for _, ev := range batch {
-			cursor = ev.Sequence
-			if keep != nil && !keep(ev) {
-				continue
-			}
-			if err := writeEvent(w, ev); err != nil {
-				return cursor, err
-			}
-			wrote = true
-		}
+		var wrote bool
+		cursor, wrote, err = emitBatch(w, batch, cursor, keep)
+		if err != nil {
+			return cursor, err
+		}
 		if wrote {
 			flusher.Flush()
 		}
 	}
 }
+
+func emitBatch(w io.Writer, batch []store.Event, cursor int64, keep func(store.Event) bool) (int64, bool, error) {
+	var wrote bool
+	for _, ev := range batch {
+		cursor = ev.Sequence
+		if keep != nil && !keep(ev) {
+			continue
+		}
+		if err := writeEvent(w, ev); err != nil {
+			return cursor, wrote, err
+		}
+		wrote = true
+	}
+	return cursor, wrote, nil
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/subscribe/handler.go` around lines 236 - 259, readBatchAndEmit is
slightly over the cognitive complexity limit; extract the inner per-batch
emission loop into a new helper (e.g., emitBatch or emitEventsForBatch) that
accepts ctx, w io.Writer, flusher http.Flusher, batch []store.Event, cursor
*int64, keep func(store.Event) bool and returns (wrote bool, err error) so the
main readBatchAndEmit loop simply calls h.emitBatch(...), updates cursor from
the helper (or the helper mutates the passed cursor), handles the returned error
and flush decision, and preserves existing calls to writeEvent and
flusher.Flush; keep all original return semantics from readBatchAndEmit and
ensure the helper respects skipping via keep and sets cursor to ev.Sequence for
each processed event.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md`:
- Around line 15-19: The fenced code block showing log output lacks a language
identifier; update the block in the file (the three-backtick fenced block
containing "Render webhook: signature verification failed...") to include a
language tag such as text or log (e.g., change ``` to ```text or ```log) so
markdownlint passes and the logs render correctly.
- Line 3: Update the document's Status line from "approved, pending
implementation" to reflect that the work is implemented; edit the "Status:"
field in this spec to either "implemented" or "approved, implemented in PR `#8`"
so it matches the implemented tests and the PR summary (refer to the "Status:"
heading in this spec file).

---

Nitpick comments:
In `@internal/subscribe/handler.go`:
- Around line 236-259: readBatchAndEmit is slightly over the cognitive
complexity limit; extract the inner per-batch emission loop into a new helper
(e.g., emitBatch or emitEventsForBatch) that accepts ctx, w io.Writer, flusher
http.Flusher, batch []store.Event, cursor *int64, keep func(store.Event) bool
and returns (wrote bool, err error) so the main readBatchAndEmit loop simply
calls h.emitBatch(...), updates cursor from the helper (or the helper mutates
the passed cursor), handles the returned error and flush decision, and preserves
existing calls to writeEvent and flusher.Flush; keep all original return
semantics from readBatchAndEmit and ensure the helper respects skipping via keep
and sets cursor to ev.Sequence for each processed event.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: a10ba395-9a46-4c3e-8d5a-127facfaa754

📥 Commits

Reviewing files that changed from the base of the PR and between 54146b4 and 34982b7.

📒 Files selected for processing (9)
  • CLAUDE.md
  • README.md
  • docs/quickstart.md
  • docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md
  • internal/server/server.go
  • internal/sources/render.go
  • internal/sources/sources.go
  • internal/subscribe/handler.go
  • internal/subscribe/handler_test.go

@@ -0,0 +1,179 @@
# Skip stale events on `/subscribe` initial backfill

**Status:** approved, pending implementation
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update the status field to reflect implementation completion.

The status currently reads "approved, pending implementation," but this PR includes the implementation (per the PR summary: "10 TDD cases are implemented in internal/subscribe/handler_test.go"). Consider updating to "implemented" or "approved, implemented in PR #8."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md`
at line 3, Update the document's Status line from "approved, pending
implementation" to reflect that the work is implemented; edit the "Status:"
field in this spec to either "implemented" or "approved, implemented in PR `#8`"
so it matches the implemented tests and the PR summary (refer to the "Status:"
heading in this spec file).

Comment on lines +15 to +19
```
Render webhook: signature verification failed: Message timestamp too old
Filter chain halted as :verify_render_signature rendered or redirected
Completed 401 Unauthorized
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add language identifier to fenced code block.

The code block displaying error logs should specify a language identifier to satisfy markdownlint. Since this is log output, consider using ```text or ```log.

📝 Proposed fix
-```
+```text
 Render webhook: signature verification failed: Message timestamp too old
 Filter chain halted as :verify_render_signature rendered or redirected
 Completed 401 Unauthorized
</details>

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>

[warning] 15-15: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md
around lines 15 - 19, The fenced code block showing log output lacks a language
identifier; update the block in the file (the three-backtick fenced block
containing "Render webhook: signature verification failed...") to include a
language tag such as text or log (e.g., change totext or ```log) so
markdownlint passes and the logs render correctly.


</details>

<!-- fingerprinting:phantom:triton:puma -->

<!-- d98c2f50 -->

<!-- This is an auto-generated comment by CodeRabbit -->

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant