feat: Publisher interface and adapter event types#7
Merged
Conversation
0e13e6b to
5616225
Compare
5616225 to
85acb90
Compare
Contributor
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
marino39
approved these changes
May 13, 2026
Contributor
marino39
left a comment
There was a problem hiding this comment.
LGTM - however you need to rebase
Introduce runnable.Publisher (single-method, takes any event), a fanout Publishers type, ctx helpers (PublisherFrom, Publish), and the WithPublisher Option that stacks additively across multiple calls. Add event types (RetryEvent, DrainStartedEvent, DrainTimedOutEvent, PanicRecoveredEvent) that adapters publish in later commits. Run installs the configured publisher into runCtx so adapters can reach it via runnable.Publish without explicit wiring.
Wire publish calls into the three event-emitting adapters: - Retry publishes runnable.RetryEvent (1-indexed Attempt + Err) after each failed attempt that does not exhaust the budget. Context-error give-ups do not emit an event. - Draining publishes runnable.DrainStartedEvent when the outer ctx is cancelled and runnable.DrainTimedOutEvent when the grace window expires. - Recovering drops its PanicHandler argument; panics now surface exclusively via runnable.PanicRecoveredEvent on the ctx Publisher. Update the ticker-with-drain example to install a logPublisher implementing the Publisher interface and switch on event type.
WithStatus registers a per-id statusPublisher with the runnable, which forwards events to StatusStore.publish. RetryEvent increments the restarts counter for that id; other events are no-ops for now. Re-introduces the Restarts field on Status that was dropped earlier in this stack when WithRetry's onStart side-channel was severed. The counter is now event-driven, so any Publisher (not just adapters.Retry) that emits RetryEvent will be reflected. Add an integration test in runnable_test exercising WithStatus + adapters.Retry end-to-end (lives in runnable_test rather than runnable to avoid an import cycle).
Add an Observability via Publisher section showing how to subscribe to RetryEvent / DrainStartedEvent / PanicRecoveredEvent through a custom Publisher implementation and runnable.WithPublisher. Update the migration table for the Recovering signature change (no PanicHandler argument; use a Publisher subscriber instead) and note that Status.Restarts is now event-driven rather than removed.
Trim doc comments on Publisher / Publishers / PublisherFrom / Publish / WithPublisher to one or two purpose-stating lines each.
The helper does not always append — it returns one operand unchanged when the other is nil, and only fans out via Publishers when both are present. merge captures the semantics more precisely.
d38c7cd to
f34fb06
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds an observability layer for adapters. A
runnable.Publisherinterface (singlePublish(any)method) is installed onto the runnable's ctx viarunnable.WithPublisher; adapters callrunnable.Publish(ctx, event)to emit typed events.StatusStorebecomes aPublisherandStatus.Restartsis back — counted fromRetryEventinstead of the deprecatedWithRetryonStartside-channel.Stacked on #4 — review against
feat/drain-and-ticker, notmaster.Closes marino39's #2 quick-glance item (StatusStore refactor so adapters can store stuff).
Commits
01eed91—Publisherinterface +Publishersfanout +WithPublisherOption + ctx helpers (PublisherFrom,Publish). Event types (RetryEvent,DrainStartedEvent,DrainTimedOutEvent,PanicRecoveredEvent).94d8068— Wire publish calls into Retry / Draining / Recovering.Recoveringdrops itsPanicHandlerargument (single way to handle panics: subscribe viaPublisher).8700487—WithStatusregisters a tagged statusPublisher;Status.Restartsincrements fromRetryEvent. Integration test (inrunnable_testto avoid an import cycle) verifiesWithStatus+adapters.Retryend-to-end.5616225— README "Observability via Publisher" section + updated migration table.Shape
Test plan
go test -race -count=1 ./...— passes.r.publisher = mergePublisher(...)wiring inWithStatus.applyand confirmedTestStatusRestartsCountedFromRetryAdapterfails (expected 2, got 0) before restoring.golangci-lint run ./...— clean.Notes
Publish(event any)— single method, no source/ID parameter.WithStatustags events with the runnable ID via an internalstatusPublisherwrapper.WithPublisher(andWithStatus) stack additively via aPublishersfanout type. Order doesn't matter.Publisher.Publishruns on the goroutine that emits. Documented in README. Subscribers needing async dispatch should buffer internally.adapters.Recovering()no longer takes aPanicHandler. Migration is one extra Option (WithPublisher) — documented.