Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 59 additions & 48 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ r := runnable.New(func(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
default:
case <-time.After(time.Second):
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
})
Expand All @@ -71,7 +70,8 @@ if err != nil {
### Runnable Function with timeout
```go
fmt.Println("Simple function with timeout...")
ctxWithTimeout, _ := context.WithTimeout(context.Background(), 5*time.Second)
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = runnable.New(func(ctx context.Context) error {
fmt.Println("Starting...")
defer fmt.Println("Stopping...")
Expand All @@ -80,9 +80,8 @@ err = runnable.New(func(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
default:
case <-time.After(time.Second):
}
time.Sleep(1 * time.Second)
fmt.Println("Running...")
}
}).Run(ctxWithTimeout)
Expand Down Expand Up @@ -121,60 +120,72 @@ if err != nil {
}
```

### Drain on Stop
### Adapters

By default, `Stop` cancels the runFunc's context, which aborts in-flight
work. For workers that own external calls that must complete (e.g. an
HTTP request that creates remote state), use `WithDrain` to switch to
"signal-and-wait" semantics: `Stop` closes the channel returned by
`Stopping(ctx)` and waits up to the drain timeout for runFunc to return
on its own. If the timeout elapses, `Stop` falls back to cancelling the
context and returns `ErrDrainTimedOut` once runFunc exits.
Cross-cutting behaviors that aren't part of the core lifecycle live in
the `runnable/adapters` subpackage as runFunc wrappers. They compose by
nesting.

Always select on both `<-ctx.Done()` and `<-runnable.Stopping(ctx)` —
`Stopping` signals only `Stop`; outer-context cancellation still arrives
via `ctx.Done()` and a loop that ignores it will hang.
**Ticker** — periodic execution:

```go
r := runnable.New(func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-runnable.Stopping(ctx):
return nil // finish in-flight work, then return
case <-time.After(time.Second):
doWork(ctx)
}
}
}, runnable.WithDrain(10*time.Second))
r := runnable.New(adapters.Ticker(30*time.Second, reconcile))
```

### Ticker

`NewTicker` wraps the standard "select-loop on a `time.Ticker`" pattern.
It composes with `WithDrain` (let the current tick finish on Stop) and
`WithRecoverer` (catch panics in the tick body).
**Draining** — graceful shutdown with a grace window. When the outer
ctx is cancelled, work has `timeout` to return via `adapters.Stopping(ctx)`
before its ctx is force-cancelled and `adapters.ErrDrainTimedOut` is
returned.

```go
r := runnable.NewTicker(
30*time.Second,
func(ctx context.Context) error {
return reconcile(ctx)
},
runnable.WithDrain(10*time.Second),
)
r := runnable.New(adapters.Draining(10*time.Second,
adapters.Ticker(30*time.Second, reconcile),
))
```

go r.Run(ctx)
Inside long-running work, always select on both `ctx.Done()` and
`adapters.Stopping(ctx)` — `Stopping` signals drain start, `ctx.Done()`
fires only when the drain timer expires.

// On shutdown:
stopCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
r.Stop(stopCtx) // drains the in-flight tick before returning
```
A full SIGTERM-safe service shape lives in
[`examples/ticker-with-drain`](examples/ticker-with-drain/main.go).

### Migrating from v0.1 to v0.2

v0.2 moves drain and ticker out of the core package. The Option-based
`WithDrain` and the `NewTicker` constructor are removed; their
replacements live at `runnable/adapters`.

Before (v0.1):

r := runnable.NewTicker(30*time.Second, doWork,
runnable.WithDrain(10*time.Second),
)

After (v0.2):

r := runnable.New(adapters.Draining(10*time.Second,
adapters.Ticker(30*time.Second, doWork),
))

Symbol mapping:

- `runnable.WithDrain` → use `adapters.Draining` as a runFunc wrapper.
- `runnable.NewTicker` → `adapters.Ticker` wrapped by `runnable.New`.
- `runnable.Stopping` → `adapters.Stopping`.
- `runnable.ErrDrainTimedOut` → `adapters.ErrDrainTimedOut`.

**Behavioral change:** `Stop(ctx)`'s ctx no longer shortens the drain
window. In v0.1, a caller ctx shorter than `WithDrain`'s timeout would
force-cancel mid-drain. In v0.2, `Stop`'s ctx only governs how long
the caller waits for `Stop` to return; the drain runs on its own
fixed-duration timer regardless. If you need a shorter drain budget,
configure `Draining` with the shorter duration.

A full SIGTERM-safe shape (ticker + drain + recoverer + signal.NotifyContext)
lives in [`examples/ticker-with-drain`](examples/ticker-with-drain/main.go).
**NewGroup interaction:** drain-enabled children of `NewGroup` now
drain when the group is stopped (v0.1 silently bypassed the child's
drain). No code change required at call sites — the adapter design
fixes this by construction.

### Runnable Object
```go
Expand Down
12 changes: 12 additions & 0 deletions adapters/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Package adapters provides composable wrappers around the runFunc
// signature (func(context.Context) error). Adapters layer cross-cutting
// behavior such as drain-on-shutdown (Draining) and periodic execution
// (Ticker) without coupling those concerns into the core runnable
// lifecycle.
//
// Adapters compose by nesting:
//
// r := runnable.New(adapters.Draining(10*time.Second,
// adapters.Ticker(time.Second, doWork),
// ))
package adapters
72 changes: 72 additions & 0 deletions adapters/draining.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package adapters

import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"
)

// ErrDrainTimedOut is returned by Draining when work did not exit
// within the drain timeout and was force-cancelled.
var ErrDrainTimedOut = errors.New("adapters: drain timed out")

type stoppingKey struct{}

// Stopping returns a channel that closes when Draining begins shutdown,
// or nil when ctx is not inside a Draining adapter. Always also select
// on ctx.Done() — Stopping signals drain start; ctx.Done() fires only
// when the drain timer expires.
func Stopping(ctx context.Context) <-chan struct{} {
ch, _ := ctx.Value(stoppingKey{}).(<-chan struct{})
return ch
}

// Draining wraps work with graceful-shutdown semantics: when outerCtx
// is cancelled, work has up to timeout to return via Stopping(workCtx)
// before workCtx is force-cancelled and ErrDrainTimedOut is returned.
//
// Panics in work are recovered inside Draining's goroutine and returned
// as an error containing the panic value and stack trace. They do NOT
// propagate to runnable.WithRecoverer — recover only fires on the
// goroutine where the deferred call lives, and work runs on its own.
func Draining(timeout time.Duration, work func(context.Context) error) func(context.Context) error {
return func(outerCtx context.Context) error {
// Decoupled from outerCtx so outer cancellation triggers drain
// rather than aborting work directly.
workCtx, cancelWork := context.WithCancel(context.WithoutCancel(outerCtx))
defer cancelWork()

stopping := make(chan struct{})
workCtx = context.WithValue(workCtx, stoppingKey{}, (<-chan struct{})(stopping))

done := make(chan error, 1)
go func() {
defer func() {
if rec := recover(); rec != nil {
done <- fmt.Errorf("adapters: panic in draining work: %v\n%s", rec, debug.Stack())
}
}()
done <- work(workCtx)
}()

select {
case err := <-done:
return err
case <-outerCtx.Done():
close(stopping)
}

timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-done:
return err
case <-timer.C:
cancelWork()
<-done
return ErrDrainTimedOut
}
}
}
Loading