feat(subscribe): skip stale events on /subscribe initial backfill#8
feat(subscribe): skip stale events on /subscribe initial backfill#8aaronbrethorst wants to merge 3 commits intomainfrom
Conversation
Design for making /subscribe/<source> drop events older than the source's SkewWindow during the initial replay drain, while leaving live tail and manual replay paths untouched. Aligns the auto-replay behavior with Standard Webhooks consumer timestamp tolerances.
Resolves ambiguities surfaced in audit: - Sources map semantics: presence-checked, value is the resolved effective_skew (zero meaning "use verifier default" handled in server.Build, not in the handler). - Drain split into initialDrain (filters) + liveDrain (unfiltered) so the type system enforces the live-tail-not-filtered invariant. - One-sided filter: future and zero-value provider_timestamps emit. - Cursor-advance-on-skip elevated to a load-bearing invariant with its own TDD case (all-stale batch + live emit). - Manual-replay-on-SSE caveat tightened (only reaches subscribers whose cursor < replayed.seq). - /audit correctly described as operator-action volume; not part of the recovery surface. - Logging types pinned (slog.Int64 for seq, slog.Duration for age and skew_window). - Doc strings to update named explicitly (README.md:3, README.md:107, docs/quickstart.md:138, plus handler.go package doc and CLAUDE.md).
Standard Webhooks consumers reject events whose `webhook-timestamp` is older than ~5 minutes. After a long disconnect, /subscribe replay would re-deliver stale events that the consumer would 401-reject on verification. The initial-backfill drain now filters events older than the source's effective skew window (per-source `skew_window` from hooks.yaml or `sources.DefaultSkewWindow` when zero/unset, resolved at the seam in `internal/server.Build`). The cursor advances past skipped events so reconnects with `?since=<seq>` start past them and the live drain does not re-emit them. Live tail (notifier-triggered or keepalive-triggered drains) is unfiltered, so manual replay via the inspector still reaches currently-connected subscribers. The Handler.Sources field changes shape from map[string]bool to map[string]time.Duration; subscribe.New takes the same map. Spec: docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md
|
📝 WalkthroughWalkthroughThis PR implements a stale-event filter for SSE ChangesSSE Subscribe Stale-Event Backfill Filter
🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
internal/subscribe/handler.go (1)
236-259: ⚡ Quick winRefactor
readBatchAndEmitto clear the complexity gate.Line 236 currently exceeds the configured cognitive complexity threshold by a small margin; extracting per-batch emission logic into a helper should resolve this without behavior changes.
♻️ Minimal extraction sketch
func (h *Handler) readBatchAndEmit(ctx context.Context, w io.Writer, flusher http.Flusher, source string, cursor int64, batchLimit int, keep func(store.Event) bool) (int64, error) { for { batch, err := h.Store.ReadSince(ctx, source, cursor, batchLimit) if err != nil { return cursor, err } if len(batch) == 0 { return cursor, nil } - var wrote bool - for _, ev := range batch { - cursor = ev.Sequence - if keep != nil && !keep(ev) { - continue - } - if err := writeEvent(w, ev); err != nil { - return cursor, err - } - wrote = true - } + var wrote bool + cursor, wrote, err = emitBatch(w, batch, cursor, keep) + if err != nil { + return cursor, err + } if wrote { flusher.Flush() } } } + +func emitBatch(w io.Writer, batch []store.Event, cursor int64, keep func(store.Event) bool) (int64, bool, error) { + var wrote bool + for _, ev := range batch { + cursor = ev.Sequence + if keep != nil && !keep(ev) { + continue + } + if err := writeEvent(w, ev); err != nil { + return cursor, wrote, err + } + wrote = true + } + return cursor, wrote, nil +}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/subscribe/handler.go` around lines 236 - 259, readBatchAndEmit is slightly over the cognitive complexity limit; extract the inner per-batch emission loop into a new helper (e.g., emitBatch or emitEventsForBatch) that accepts ctx, w io.Writer, flusher http.Flusher, batch []store.Event, cursor *int64, keep func(store.Event) bool and returns (wrote bool, err error) so the main readBatchAndEmit loop simply calls h.emitBatch(...), updates cursor from the helper (or the helper mutates the passed cursor), handles the returned error and flush decision, and preserves existing calls to writeEvent and flusher.Flush; keep all original return semantics from readBatchAndEmit and ensure the helper respects skipping via keep and sets cursor to ev.Sequence for each processed event.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md`:
- Around line 15-19: The fenced code block showing log output lacks a language
identifier; update the block in the file (the three-backtick fenced block
containing "Render webhook: signature verification failed...") to include a
language tag such as text or log (e.g., change ``` to ```text or ```log) so
markdownlint passes and the logs render correctly.
- Line 3: Update the document's Status line from "approved, pending
implementation" to reflect that the work is implemented; edit the "Status:"
field in this spec to either "implemented" or "approved, implemented in PR `#8`"
so it matches the implemented tests and the PR summary (refer to the "Status:"
heading in this spec file).
---
Nitpick comments:
In `@internal/subscribe/handler.go`:
- Around line 236-259: readBatchAndEmit is slightly over the cognitive
complexity limit; extract the inner per-batch emission loop into a new helper
(e.g., emitBatch or emitEventsForBatch) that accepts ctx, w io.Writer, flusher
http.Flusher, batch []store.Event, cursor *int64, keep func(store.Event) bool
and returns (wrote bool, err error) so the main readBatchAndEmit loop simply
calls h.emitBatch(...), updates cursor from the helper (or the helper mutates
the passed cursor), handles the returned error and flush decision, and preserves
existing calls to writeEvent and flusher.Flush; keep all original return
semantics from readBatchAndEmit and ensure the helper respects skipping via keep
and sets cursor to ev.Sequence for each processed event.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: a10ba395-9a46-4c3e-8d5a-127facfaa754
📒 Files selected for processing (9)
CLAUDE.mdREADME.mddocs/quickstart.mddocs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.mdinternal/server/server.gointernal/sources/render.gointernal/sources/sources.gointernal/subscribe/handler.gointernal/subscribe/handler_test.go
| @@ -0,0 +1,179 @@ | |||
| # Skip stale events on `/subscribe` initial backfill | |||
|
|
|||
| **Status:** approved, pending implementation | |||
There was a problem hiding this comment.
Update the status field to reflect implementation completion.
The status currently reads "approved, pending implementation," but this PR includes the implementation (per the PR summary: "10 TDD cases are implemented in internal/subscribe/handler_test.go"). Consider updating to "implemented" or "approved, implemented in PR #8."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md`
at line 3, Update the document's Status line from "approved, pending
implementation" to reflect that the work is implemented; edit the "Status:"
field in this spec to either "implemented" or "approved, implemented in PR `#8`"
so it matches the implemented tests and the PR summary (refer to the "Status:"
heading in this spec file).
| ``` | ||
| Render webhook: signature verification failed: Message timestamp too old | ||
| Filter chain halted as :verify_render_signature rendered or redirected | ||
| Completed 401 Unauthorized | ||
| ``` |
There was a problem hiding this comment.
Add language identifier to fenced code block.
The code block displaying error logs should specify a language identifier to satisfy markdownlint. Since this is log output, consider using ```text or ```log.
📝 Proposed fix
-```
+```text
Render webhook: signature verification failed: Message timestamp too old
Filter chain halted as :verify_render_signature rendered or redirected
Completed 401 Unauthorized</details>
<details>
<summary>🧰 Tools</summary>
<details>
<summary>🪛 markdownlint-cli2 (0.22.1)</summary>
[warning] 15-15: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
</details>
</details>
<details>
<summary>🤖 Prompt for AI Agents</summary>
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md
around lines 15 - 19, The fenced code block showing log output lacks a language
identifier; update the block in the file (the three-backtick fenced block
containing "Render webhook: signature verification failed...") to include a
language tag such as text or log (e.g., change totext or ```log) so
markdownlint passes and the logs render correctly.
</details>
<!-- fingerprinting:phantom:triton:puma -->
<!-- d98c2f50 -->
<!-- This is an auto-generated comment by CodeRabbit -->



Summary
/subscribe/<source>initial backfill now skips events whoseprovider_timestampis older than the source's effective skew window (the sameeffective_skewingest already enforces — per-sourceskew_windowfromhooks.yaml, falling back tosources.DefaultSkewWindow= 5m). The cursor advances past skipped events so reconnects don't reconsider them and the unfiltered live drain doesn't re-emit them.hooksctl replay.Handler.Sourcesshape changes frommap[string]booltomap[string]time.Duration(the per-source effective skew, resolved at the seam ininternal/server.Buildso the handler never sees zero).Message timestamp too oldafter a long disconnect: the relay durably stores webhooks, but consumers verify timestamps strictly, so automatic replay across the consumer's tolerance always failed.Spec
docs/superpowers/specs/2026-05-09-subscribe-stale-backfill-filter-design.md— all 10 TDD cases implemented ininternal/subscribe/handler_test.go.Test plan
go test -race ./...cleanmake lintclean (golangci-lint v2: errcheck, govet, staticcheck, unused, gosec, errorlint, etc.)TestInitialBackfillSkipsStaleEvent,TestInitialBackfillDeliversFreshEvent,TestInitialBackfillMixedBatchOnlyFreshEmittedAndIdempotentOnReconnect,TestInitialBackfillAllStaleStillAdvancesCursor,TestLiveTailDoesNotFilter,TestInitialBackfillBoundaryAtExactlySkew,TestInitialBackfillFutureTimestampPasses,TestInitialBackfillZeroProviderTimestampPasses,TestInitialBackfillSkipIsObservable,TestUnknownSourceIs404WithMapShapehooksbinary: ingest a fresh signed webhook → SSE delivers it; ingest a stale signed webhook → ingest correctly rejects with 401 (defense in depth — the SSE filter is the second line, ingest is the first).Documentation updated
README.md(top-line description + section 6 description offorward)docs/quickstart.md(section 7 —forwardbehavior)CLAUDE.md(internal/subscribebullet — new policy + threshold source)internal/subscribe/handler.gopackage docinternal/sources/sources.go— promotedrenderDefaultSkewfromrender.goto package-levelDefaultSkewWindowso the seam ininternal/server.Buildreferences the same constant the verifier factories do.Review notes (non-blocking, surfacing for visibility)
Multi-agent review found these items worth noting at review time:
Zero-skew silent substitution.
internal/server/server.go:96-99falls back tosources.DefaultSkewWindowwhensrc.SkewWindow == 0. The spec explicitly documents this as preserved pre-existing behavior ("Operator setsskew_window: '0s'expecting no enforcement, gets 5m default everywhere — pre-existing ingest behavior; this spec preserves it consistently rather than introducing a second convention"). If we want to distinguish "unset" from "explicit 0s" later, that's a separate change tointernal/config/config.go(likely making the field a*time.Duration).keep=nilAPI onreadBatchAndEmit. A future caller that filters vialiveDrainwould need to be careful not to silently drop events. The spec is explicit that live tail does not filter, so the single shared inner helper with a nil predicate is the cleanest expression of that today; renaming toskip(inverted predicate) is a reasonable follow-up if a second filtered call site appears.No keepalive during long initial drain. The keepalive ticker only starts after
initialDrainreturns. A subscriber draining a very large backlog of all-stale events would see no SSE traffic for the duration. Out of scope for this PR but worth a follow-up if anyone reports stuck connections after this lands.Skip log is debug-level and not a metric. Per spec ("Operator surprise — events present in the store but never replayed: Debug log on every skip; observable via
--devorHOOKS_LOG_LEVEL=debug"). If operational observability becomes a problem, a counter is the natural follow-up.Summary by CodeRabbit
Release Notes
Enhancements
Documentation