Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ them with `runnable.WithAdapters` (left-to-right = outermost-to-innermost):
```go
r := runnable.New(reconcile, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(reportPanic),
adapters.Recovering(),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))
Expand All @@ -116,12 +116,13 @@ cancelled or the work returns an error. Composes with Draining: an
in-flight tick is allowed to finish before the loop exits.

**Recovering** — turns panics in the wrapped work into errors and
invokes the optional handler before returning. Place inside Draining
when both are in use.
emits a `runnable.PanicRecoveredEvent` to the Publisher on ctx. Place
inside Draining when both are in use.

**Retry** — re-invokes the wrapped work up to `maxRetries` times on
non-context errors. If `resetAfter` is non-zero and at least that long
has passed since the previous attempt, the retry budget resets.
has passed since the previous attempt, the retry budget resets. Emits
a `runnable.RetryEvent` after each failed attempt.

Inside long-running work, always select on both `ctx.Done()` and
`adapters.Stopping(ctx)` — `Stopping` signals drain start, `ctx.Done()`
Expand All @@ -130,6 +131,38 @@ fires only when the drain timer expires.
A full SIGTERM-safe service shape lives in
[`examples/ticker-with-drain`](examples/ticker-with-drain/main.go).

### Observability via Publisher

Adapters emit typed events to a `runnable.Publisher` installed on the
runnable's ctx. Use `runnable.WithPublisher` to register one (or many —
multiple `WithPublisher` calls fan out):

```go
type log struct{}

func (log) Publish(event any) {
switch ev := event.(type) {
case runnable.RetryEvent:
fmt.Printf("retry attempt %d: %v\n", ev.Attempt, ev.Err)
case runnable.DrainStartedEvent:
fmt.Printf("drain started, %s window\n", ev.Timeout)
case runnable.PanicRecoveredEvent:
fmt.Fprintf(os.Stderr, "panic: %v\n%s", ev.Recovered, ev.Stack)
}
}

r := runnable.New(work,
runnable.WithPublisher(log{}),
runnable.WithAdapters(adapters.Retry(3, time.Minute), adapters.Recovering()),
)
```

`StatusStore` is a Publisher too — `WithStatus(id, store)` wires it
automatically and counts `RetryEvent`s into `Status.Restarts`.

`Publisher.Publish` runs on the caller's goroutine, so subscribers must
not block. Buffer internally if you need async dispatch.

### Migrating from v0.1 to v0.2

v0.2 moves drain, ticker, retry, and panic recovery out of the core
Expand All @@ -149,7 +182,7 @@ After (v0.2):

r := runnable.New(doWork, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(handler),
adapters.Recovering(),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))
Expand All @@ -161,9 +194,10 @@ Symbol mapping:
(no longer takes the work argument; pass work to `runnable.New`).
- `runnable.WithRetry` / `runnable.ResetNever` → `adapters.Retry` /
`adapters.ResetNever`.
- `runnable.WithRecoverer` → `adapters.Recovering` with a single
`PanicHandler` callback (the two-interface `RecoveryReporter` /
`StackPrinter` split is gone).
- `runnable.WithRecoverer` → `adapters.Recovering()` plus a
`runnable.WithPublisher` subscriber listening for
`runnable.PanicRecoveredEvent` (the two-interface `RecoveryReporter` /
`StackPrinter` callback split is gone).
- `runnable.Stopping` → `adapters.Stopping`.
- `runnable.ErrDrainTimedOut` → `adapters.ErrDrainTimedOut`.

Expand All @@ -174,10 +208,11 @@ the caller waits for `Stop` to return; the drain runs on its own
fixed-duration timer regardless. If you need a shorter drain budget,
configure `Draining` with the shorter duration.

**Status.Restarts removed.** The `Restarts` field on `Status` counted
`WithRetry` re-entries via the deprecated `onStart` coupling; with
retry moved into adapters it had no clean way to surface. Pending a
proper event/observer hook in a later release.
**Status.Restarts is event-driven.** The `Restarts` field on `Status`
is still present, but it now counts `runnable.RetryEvent`s published
by `adapters.Retry` (or any other Publisher source) rather than
being incremented by an `onStart` side-channel from `WithRetry`. No
call-site change required when using `WithStatus` + `adapters.Retry`.

**NewGroup interaction:** drain-enabled children of `NewGroup` now
drain when the group is stopped (v0.1 silently bypassed the child's
Expand Down
2 changes: 2 additions & 0 deletions adapters/draining.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func Draining(timeout time.Duration) runnable.Adapter {
case err := <-done:
return err
case <-outerCtx.Done():
runnable.Publish(outerCtx, runnable.DrainStartedEvent{Timeout: timeout})
close(stopping)
}

Expand All @@ -65,6 +66,7 @@ func Draining(timeout time.Duration) runnable.Adapter {
case <-timer.C:
cancelWork()
<-done
runnable.Publish(outerCtx, runnable.DrainTimedOutEvent{})
return ErrDrainTimedOut
}
}
Expand Down
55 changes: 55 additions & 0 deletions adapters/draining_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,61 @@ func TestDraining_WorkErrorPropagatesWithoutDrain(t *testing.T) {
require.ErrorIs(t, err, sentinel)
}

func TestDraining_PublishesDrainStarted(t *testing.T) {
pub := &capturingPublisher{}
started := make(chan struct{})

work := func(ctx context.Context) error {
close(started)
<-adapters.Stopping(ctx)
return nil
}

r := runnable.New(work,
runnable.WithPublisher(pub),
runnable.WithAdapters(adapters.Draining(1*time.Second)),
)
go func() { _ = r.Run(context.Background()) }()

<-started
require.NoError(t, r.Stop(context.Background()))

events := pub.snapshot()
require.Len(t, events, 1)
ev, ok := events[0].(runnable.DrainStartedEvent)
require.True(t, ok, "expected DrainStartedEvent, got %T", events[0])
assert.Equal(t, time.Second, ev.Timeout)
}

func TestDraining_PublishesDrainTimedOut(t *testing.T) {
pub := &capturingPublisher{}
started := make(chan struct{})

work := func(ctx context.Context) error {
close(started)
<-ctx.Done() // ignore Stopping; force the timer to fire
return ctx.Err()
}

r := runnable.New(work,
runnable.WithPublisher(pub),
runnable.WithAdapters(adapters.Draining(50*time.Millisecond)),
)
runErr := make(chan error, 1)
go func() { runErr <- r.Run(context.Background()) }()

<-started
require.NoError(t, r.Stop(context.Background()))
require.ErrorIs(t, <-runErr, adapters.ErrDrainTimedOut)

events := pub.snapshot()
require.Len(t, events, 2)
_, ok := events[0].(runnable.DrainStartedEvent)
require.True(t, ok, "first event should be DrainStartedEvent, got %T", events[0])
_, ok = events[1].(runnable.DrainTimedOutEvent)
require.True(t, ok, "second event should be DrainTimedOutEvent, got %T", events[1])
}

func TestDraining_RecoversPanicAsError(t *testing.T) {
// Regression: panics in work run on Draining's spawned goroutine,
// not on the goroutine where outer recover defers live. Without
Expand Down
17 changes: 6 additions & 11 deletions adapters/recovering.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ import (
"github.com/0xsequence/runnable"
)

// PanicHandler observes a panic caught by Recovering. Runs on next's
// goroutine, so must not block.
type PanicHandler func(ctx context.Context, rec any, stack []byte)

// Recovering returns an Adapter that converts panics from next into
// errors and invokes handler (if non-nil). Place inside Draining when
// both are used, so handler sees the panic before Draining's safety-net
// recovery formats it.
func Recovering(handler PanicHandler) runnable.Adapter {
// errors and publishes a runnable.PanicRecoveredEvent to the Publisher
// on ctx, if any. Place inside Draining when both are used, so the
// Publisher sees the panic before Draining's safety-net recovery
// formats it.
func Recovering() runnable.Adapter {
return func(next runnable.RunFunc) runnable.RunFunc {
return func(ctx context.Context) (err error) {
defer func() {
Expand All @@ -25,9 +22,7 @@ func Recovering(handler PanicHandler) runnable.Adapter {
return
}
stack := debug.Stack()
if handler != nil {
handler(ctx, rec, stack)
}
runnable.Publish(ctx, runnable.PanicRecoveredEvent{Recovered: rec, Stack: stack})
err = fmt.Errorf("adapters: panic: %v", rec)
}()
return next(ctx)
Expand Down
62 changes: 43 additions & 19 deletions adapters/recovering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package adapters_test

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -11,51 +12,74 @@ import (
"github.com/0xsequence/runnable/adapters"
)

func TestRecovering_TurnsPanicIntoError(t *testing.T) {
var captured any
handler := func(_ context.Context, rec any, _ []byte) {
captured = rec
}
type capturingPublisher struct {
mu sync.Mutex
events []any
}

func (c *capturingPublisher) Publish(event any) {
c.mu.Lock()
defer c.mu.Unlock()
c.events = append(c.events, event)
}

func (c *capturingPublisher) snapshot() []any {
c.mu.Lock()
defer c.mu.Unlock()
out := make([]any, len(c.events))
copy(out, c.events)
return out
}

func TestRecovering_TurnsPanicIntoError(t *testing.T) {
work := func(ctx context.Context) error {
panic("boom")
}

r := runnable.New(work, runnable.WithAdapters(adapters.Recovering(handler)))
r := runnable.New(work, runnable.WithAdapters(adapters.Recovering()))
err := r.Run(context.Background())
require.Error(t, err)
assert.Contains(t, err.Error(), "boom")
assert.Equal(t, "boom", captured)
}

func TestRecovering_NilHandlerStillRecovers(t *testing.T) {
func TestRecovering_PublishesPanicEvent(t *testing.T) {
pub := &capturingPublisher{}

work := func(ctx context.Context) error {
panic("boom")
}

r := runnable.New(work, runnable.WithAdapters(adapters.Recovering(nil)))
r := runnable.New(work,
runnable.WithPublisher(pub),
runnable.WithAdapters(adapters.Recovering()),
)
err := r.Run(context.Background())
require.Error(t, err)
assert.Contains(t, err.Error(), "boom")
}

func TestRecovering_PassesThroughOnSuccess(t *testing.T) {
called := false
handler := func(_ context.Context, rec any, _ []byte) {
called = true
}
events := pub.snapshot()
require.Len(t, events, 1)
ev, ok := events[0].(runnable.PanicRecoveredEvent)
require.True(t, ok, "expected PanicRecoveredEvent, got %T", events[0])
assert.Equal(t, "boom", ev.Recovered)
assert.NotEmpty(t, ev.Stack)
}

func TestRecovering_NoPublishOnSuccess(t *testing.T) {
pub := &capturingPublisher{}
work := func(ctx context.Context) error { return nil }

r := runnable.New(work, runnable.WithAdapters(adapters.Recovering(handler)))
r := runnable.New(work,
runnable.WithPublisher(pub),
runnable.WithAdapters(adapters.Recovering()),
)
require.NoError(t, r.Run(context.Background()))
assert.False(t, called, "handler must not fire when next returns normally")
assert.Empty(t, pub.snapshot())
}

func TestRecovering_PassesThroughError(t *testing.T) {
work := func(ctx context.Context) error { return assert.AnError }

r := runnable.New(work, runnable.WithAdapters(adapters.Recovering(nil)))
r := runnable.New(work, runnable.WithAdapters(adapters.Recovering()))
err := r.Run(context.Background())
require.ErrorIs(t, err, assert.AnError)
}
6 changes: 4 additions & 2 deletions adapters/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ const ResetNever time.Duration = 0

// Retry returns an Adapter that re-invokes next up to maxRetries times
// on non-context errors. If resetAfter > 0 and at least that long has
// passed since the previous attempt, the budget resets. Retry does not
// observe Stopping — wrap it inside Draining if you need both.
// passed since the previous attempt, the budget resets. Each failed
// attempt publishes a runnable.RetryEvent (Attempt is 1-indexed). Retry
// does not observe Stopping — wrap it inside Draining if you need both.
func Retry(maxRetries int, resetAfter time.Duration) runnable.Adapter {
return func(next runnable.RunFunc) runnable.RunFunc {
return func(ctx context.Context) error {
Expand All @@ -35,6 +36,7 @@ func Retry(maxRetries int, resetAfter time.Duration) runnable.Adapter {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}
runnable.Publish(ctx, runnable.RetryEvent{Attempt: i + 1, Err: err})
}
return err
}
Expand Down
38 changes: 38 additions & 0 deletions adapters/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,41 @@ func TestRetry_DoesNotRetryContextErrors(t *testing.T) {
require.ErrorIs(t, err, context.Canceled)
assert.Equal(t, int32(1), count.Load())
}

func TestRetry_PublishesEventPerFailedAttempt(t *testing.T) {
pub := &capturingPublisher{}
var count atomic.Int32
work := func(ctx context.Context) error {
count.Add(1)
return assert.AnError
}

r := runnable.New(work,
runnable.WithPublisher(pub),
runnable.WithAdapters(adapters.Retry(3, adapters.ResetNever)),
)
err := r.Run(context.Background())
require.ErrorIs(t, err, assert.AnError)

events := pub.snapshot()
require.Len(t, events, 3, "one RetryEvent per failed attempt")
for i, e := range events {
ev, ok := e.(runnable.RetryEvent)
require.True(t, ok, "event %d: expected RetryEvent, got %T", i, e)
assert.Equal(t, i+1, ev.Attempt, "Attempt is 1-indexed")
assert.ErrorIs(t, ev.Err, assert.AnError)
}
}

func TestRetry_DoesNotPublishOnContextError(t *testing.T) {
pub := &capturingPublisher{}
work := func(ctx context.Context) error { return context.Canceled }

r := runnable.New(work,
runnable.WithPublisher(pub),
runnable.WithAdapters(adapters.Retry(3, adapters.ResetNever)),
)
err := r.Run(context.Background())
require.ErrorIs(t, err, context.Canceled)
assert.Empty(t, pub.snapshot(), "context-error give-up must not emit a retry event")
}
Loading
Loading