Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 47 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,58 +90,38 @@ if err != nil {
}
```

### Runnable Function with retry
```go
fmt.Println("Simple function with retry...")
errorReturned := false
err = runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")

if !errorReturned {
errorReturned = true
return fmt.Errorf("error")
}

// do something
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return nil
default:
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
return nil
}, runnable.WithRetry(3, runnable.ResetNever)).Run(context.Background())
if err != nil {
fmt.Println(err)
}
```

### Adapters

Cross-cutting behaviors that aren't part of the core lifecycle live in
the `runnable/adapters` subpackage as runFunc wrappers. They compose by
nesting.

**Ticker** — periodic execution:
the `runnable/adapters` subpackage as chi-style middleware: each
`runnable.Adapter` has the shape `func(next RunFunc) RunFunc`. Apply
them with `runnable.WithAdapters` (left-to-right = outermost-to-innermost):

```go
r := runnable.New(adapters.Ticker(30*time.Second, reconcile))
r := runnable.New(reconcile, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(reportPanic),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))
```

**Draining** — graceful shutdown with a grace window. When the outer
ctx is cancelled, work has `timeout` to return via `adapters.Stopping(ctx)`
before its ctx is force-cancelled and `adapters.ErrDrainTimedOut` is
returned.
ctx is cancelled, the wrapped work has `timeout` to return via
`adapters.Stopping(ctx)` before its ctx is force-cancelled and
`adapters.ErrDrainTimedOut` is returned.

```go
r := runnable.New(adapters.Draining(10*time.Second,
adapters.Ticker(30*time.Second, reconcile),
))
```
**Ticker** — calls the wrapped work once per interval until ctx is
cancelled or the work returns an error. Composes with Draining: an
in-flight tick is allowed to finish before the loop exits.

**Recovering** — turns panics in the wrapped work into errors and
invokes the optional handler before returning. Place inside Draining
when both are in use.

**Retry** — re-invokes the wrapped work up to `maxRetries` times on
non-context errors. If `resetAfter` is non-zero and at least that long
has passed since the previous attempt, the retry budget resets.

Inside long-running work, always select on both `ctx.Done()` and
`adapters.Stopping(ctx)` — `Stopping` signals drain start, `ctx.Done()`
Expand All @@ -152,26 +132,38 @@ A full SIGTERM-safe service shape lives in

### Migrating from v0.1 to v0.2

v0.2 moves drain and ticker out of the core package. The Option-based
`WithDrain` and the `NewTicker` constructor are removed; their
replacements live at `runnable/adapters`.
v0.2 moves drain, ticker, retry, and panic recovery out of the core
package. `WithDrain`, `NewTicker`, `WithRetry`, and `WithRecoverer`
are removed; their replacements live at `runnable/adapters` as
chi-style middleware.

Before (v0.1):

r := runnable.NewTicker(30*time.Second, doWork,
runnable.WithDrain(10*time.Second),
runnable.WithRecoverer(reporter, nil),
runnable.WithRetry(3, time.Minute),
)

After (v0.2):

r := runnable.New(adapters.Draining(10*time.Second,
adapters.Ticker(30*time.Second, doWork),
r := runnable.New(doWork, runnable.WithAdapters(
adapters.Draining(10*time.Second),
adapters.Recovering(handler),
adapters.Retry(3, time.Minute),
adapters.Ticker(30*time.Second),
))

Symbol mapping:

- `runnable.WithDrain` → use `adapters.Draining` as a runFunc wrapper.
- `runnable.NewTicker` → `adapters.Ticker` wrapped by `runnable.New`.
- `runnable.WithDrain` → `adapters.Draining` under `runnable.WithAdapters`.
- `runnable.NewTicker` → `adapters.Ticker` under `runnable.WithAdapters`
(no longer takes the work argument; pass work to `runnable.New`).
- `runnable.WithRetry` / `runnable.ResetNever` → `adapters.Retry` /
`adapters.ResetNever`.
- `runnable.WithRecoverer` → `adapters.Recovering` with a single
`PanicHandler` callback (the two-interface `RecoveryReporter` /
`StackPrinter` split is gone).
- `runnable.Stopping` → `adapters.Stopping`.
- `runnable.ErrDrainTimedOut` → `adapters.ErrDrainTimedOut`.

Expand All @@ -182,6 +174,11 @@ the caller waits for `Stop` to return; the drain runs on its own
fixed-duration timer regardless. If you need a shorter drain budget,
configure `Draining` with the shorter duration.

**Status.Restarts removed.** The `Restarts` field on `Status` counted
`WithRetry` re-entries via the deprecated `onStart` coupling; with
retry moved into adapters it had no clean way to surface. Pending a
proper event/observer hook in a later release.

**NewGroup interaction:** drain-enabled children of `NewGroup` now
drain when the group is stopped (v0.1 silently bypassed the child's
drain). No code change required at call sites — the adapter design
Expand Down
34 changes: 34 additions & 0 deletions adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package runnable

import "context"

// RunFunc is the signature wrapped by runnable.New and by Adapters.
type RunFunc func(ctx context.Context) error

// Adapter wraps a RunFunc with cross-cutting behavior, mirroring the
// chi middleware shape. Adapters live in the runnable/adapters
// subpackage and are applied via WithAdapters.
type Adapter func(next RunFunc) RunFunc

type withAdapters struct {
adapters []Adapter
}

// WithAdapters wraps the runnable's runFunc with the given adapters.
// Adapters are applied left-to-right as outermost-to-innermost:
// WithAdapters(A, B, C) yields A(B(C(runFunc))).
//
// Apply order across Options matters: WithAdapters sees whatever
// runFunc previous Options installed, and subsequent Options see the
// adapter-wrapped runFunc.
func WithAdapters(adapters ...Adapter) Option {
return &withAdapters{adapters: adapters}
}

func (w *withAdapters) apply(r *runnable) {
next := RunFunc(r.runFunc)
for i := len(w.adapters) - 1; i >= 0; i-- {
next = w.adapters[i](next)
}
r.runFunc = next
}
23 changes: 14 additions & 9 deletions adapters/doc.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Package adapters provides composable wrappers around the runFunc
// signature (func(context.Context) error). Adapters layer cross-cutting
// behavior such as drain-on-shutdown (Draining) and periodic execution
// (Ticker) without coupling those concerns into the core runnable
// lifecycle.
// Package adapters provides chi-style middleware around the runnable
// RunFunc signature. Each adapter is a config-only constructor that
// returns a runnable.Adapter; compose them via runnable.WithAdapters
// (first listed = outermost wrapper):
//
// Adapters compose by nesting:
//
// r := runnable.New(adapters.Draining(10*time.Second,
// adapters.Ticker(time.Second, doWork),
// r := runnable.New(reconcile, runnable.WithAdapters(
// adapters.Draining(10*time.Second),
// adapters.Recovering(reportPanic),
// adapters.Retry(3, time.Minute),
// adapters.Ticker(time.Second),
// ))
//
// Drain-on-shutdown (Draining), panic-to-error conversion (Recovering),
// retry-on-error (Retry), and periodic execution (Ticker) all live here
// rather than in the core runnable package so they don't couple into
// the core lifecycle.
package adapters
81 changes: 44 additions & 37 deletions adapters/draining.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"runtime/debug"
"time"

"github.com/0xsequence/runnable"
)

// ErrDrainTimedOut is returned by Draining when work did not exit
Expand All @@ -23,50 +25,55 @@ func Stopping(ctx context.Context) <-chan struct{} {
return ch
}

// Draining wraps work with graceful-shutdown semantics: when outerCtx
// is cancelled, work has up to timeout to return via Stopping(workCtx)
// before workCtx is force-cancelled and ErrDrainTimedOut is returned.
// Draining returns an Adapter that adds graceful-shutdown semantics:
// when outerCtx is cancelled, next has up to timeout to return via
// Stopping(workCtx) before workCtx is force-cancelled and
// ErrDrainTimedOut is returned.
//
// Panics in work are recovered inside Draining's goroutine and returned
// Panics in next are recovered inside Draining's goroutine and returned
// as an error containing the panic value and stack trace. They do NOT
// propagate to runnable.WithRecoverer — recover only fires on the
// goroutine where the deferred call lives, and work runs on its own.
func Draining(timeout time.Duration, work func(context.Context) error) func(context.Context) error {
return func(outerCtx context.Context) error {
// Decoupled from outerCtx so outer cancellation triggers drain
// rather than aborting work directly.
workCtx, cancelWork := context.WithCancel(context.WithoutCancel(outerCtx))
defer cancelWork()
// propagate to outer recover handlers — recover only fires on the
// goroutine where the deferred call lives, and next runs on its own.
// Compose with Recovering inside Draining if you need a panic handler
// callback.
func Draining(timeout time.Duration) runnable.Adapter {
return func(next runnable.RunFunc) runnable.RunFunc {
return func(outerCtx context.Context) error {
// Decoupled from outerCtx so outer cancellation triggers drain
// rather than aborting next directly.
workCtx, cancelWork := context.WithCancel(context.WithoutCancel(outerCtx))
defer cancelWork()

stopping := make(chan struct{})
workCtx = context.WithValue(workCtx, stoppingKey{}, (<-chan struct{})(stopping))
stopping := make(chan struct{})
workCtx = context.WithValue(workCtx, stoppingKey{}, (<-chan struct{})(stopping))

done := make(chan error, 1)
go func() {
defer func() {
if rec := recover(); rec != nil {
done <- fmt.Errorf("adapters: panic in draining work: %v\n%s", rec, debug.Stack())
}
done := make(chan error, 1)
go func() {
defer func() {
if rec := recover(); rec != nil {
done <- fmt.Errorf("adapters: panic in draining work: %v\n%s", rec, debug.Stack())
}
}()
done <- next(workCtx)
}()
done <- work(workCtx)
}()

select {
case err := <-done:
return err
case <-outerCtx.Done():
close(stopping)
}
select {
case err := <-done:
return err
case <-outerCtx.Done():
close(stopping)
}

timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-done:
return err
case <-timer.C:
cancelWork()
<-done
return ErrDrainTimedOut
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-done:
return err
case <-timer.C:
cancelWork()
<-done
return ErrDrainTimedOut
}
}
}
}
17 changes: 8 additions & 9 deletions adapters/draining_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDraining_WorkReturnsNaturallyViaStopping(t *testing.T) {
}
}

r := runnable.New(adapters.Draining(1*time.Second, work))
r := runnable.New(work, runnable.WithAdapters(adapters.Draining(1*time.Second)))
runErr := make(chan error, 1)
go func() { runErr <- r.Run(context.Background()) }()

Expand Down Expand Up @@ -63,7 +63,7 @@ func TestDraining_TimerForcesCancelWhenWorkIgnoresStopping(t *testing.T) {
return ctx.Err()
}

r := runnable.New(adapters.Draining(100*time.Millisecond, work))
r := runnable.New(work, runnable.WithAdapters(adapters.Draining(100*time.Millisecond)))
runErr := make(chan error, 1)
go func() { runErr <- r.Run(context.Background()) }()

Expand Down Expand Up @@ -98,7 +98,7 @@ func TestDraining_OuterCtxCancelTriggersDrain(t *testing.T) {
}
}

r := runnable.New(adapters.Draining(1*time.Second, work))
r := runnable.New(work, runnable.WithAdapters(adapters.Draining(1*time.Second)))
ctx, cancel := context.WithCancel(context.Background())
runErr := make(chan error, 1)
go func() { runErr <- r.Run(ctx) }()
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestDraining_ConcurrentStopsPreserveDrainSemantics(t *testing.T) {
}
}

r := runnable.New(adapters.Draining(2*time.Second, work))
r := runnable.New(work, runnable.WithAdapters(adapters.Draining(2*time.Second)))
go func() { _ = r.Run(context.Background()) }()

<-started
Expand Down Expand Up @@ -167,21 +167,20 @@ func TestDraining_WorkErrorPropagatesWithoutDrain(t *testing.T) {
sentinel := errors.New("work failed")
work := func(ctx context.Context) error { return sentinel }

r := runnable.New(adapters.Draining(1*time.Second, work))
r := runnable.New(work, runnable.WithAdapters(adapters.Draining(1*time.Second)))
err := r.Run(context.Background())
require.ErrorIs(t, err, sentinel)
}

func TestDraining_RecoversPanicAsError(t *testing.T) {
// Regression: panics in work run on Draining's spawned goroutine,
// not on the goroutine where runnable.WithRecoverer's defer lives.
// Without internal recovery, a tick panic would crash the process.
// Draining must catch it and surface as an error.
// not on the goroutine where outer recover defers live. Without
// internal recovery, a tick panic would crash the process.
work := func(ctx context.Context) error {
panic("boom")
}

r := runnable.New(adapters.Draining(1*time.Second, work))
r := runnable.New(work, runnable.WithAdapters(adapters.Draining(1*time.Second)))
err := r.Run(context.Background())
require.Error(t, err)
assert.Contains(t, err.Error(), "boom", "panic value should be embedded in error")
Expand Down
Loading