Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l
- [Lifecycle Hooks](#lifecycle-hooks)
- [Back-pressure Policies](#back-pressure-policies)
- [Async Mode](#async-mode)
- [Dead Letter Queue](#dead-letter-queue)
- [Transactions](#transactions)
- [Scheduling](#scheduling)
- [API Reference](#api-reference)
Expand All @@ -35,6 +36,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l
- **Middleware** — wrap handlers with logging, tracing, retries, or any cross-cutting behaviour
- **Lifecycle hooks** — `OnBefore`, `OnAfter`, `OnError` for observability without touching handler logic
- **Back-pressure** — choose `DropOldest`, `Block`, or `ReturnError` per store
- **Dead letter queue** — failed and panicking events are routed to an inspectable, replayable secondary store instead of being silently counted
- **Transactions** — batch events and commit or roll back as a unit
- **Scheduling** — fire events at a future `time.Time` or after a `time.Duration`
- **Metrics** — published, processed, and error counters via a single call
Expand Down Expand Up @@ -237,6 +239,88 @@ if err := store.Drain(ctx); err != nil {

---

## Dead Letter Queue

Attach a `DeadLetterQueue` to an `EventStore` and every event whose handler returns an error **or panics** is captured there instead of being silently swallowed into the error counter. Panics are wrapped as errors with full `errors.Is` / `errors.As` support when the original panic value was itself an `error`.

```go
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.DLQ = GoEventBus.NewDeadLetterQueue()
```

### Inspecting failures

```go
store.Publish()

for _, dl := range store.DLQ.Entries() {
fmt.Printf("event=%s err=%v attempts=%d failed=%s\n",
dl.Event.ID, dl.Err, dl.Attempts, dl.FailedAt.Format(time.RFC3339))
}
```

`Entries()` returns a snapshot copy — mutations to the slice do not affect the queue.

### Draining

```go
failed := store.DLQ.Drain() // empties the queue and returns all entries
```

### Replaying

`Replay` re-subscribes all entries and calls `Publish` once after. Each entry has its `Attempts` counter incremented. Entries that fail to re-enqueue (e.g. buffer full) are kept in the queue and the first `Subscribe` error is returned.

```go
if err := store.DLQ.Replay(ctx, store); err != nil {
log.Printf("replay partially failed: %v", err)
}
```

A common pattern is to gate replays on `Attempts` to avoid infinite retry loops:

```go
const maxAttempts = 3

for _, dl := range store.DLQ.Drain() {
if dl.Attempts >= maxAttempts {
log.Printf("dropping %s after %d attempts: %v", dl.Event.ID, dl.Attempts, dl.Err)
continue
}
_ = store.Subscribe(ctx, dl.Event)
}
store.Publish()
```

### Fan-out and the DLQ

Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected.

```go
disp.Register("order.placed",
auditLogger, // fails -> one dead letter
invoiceHandler, // succeeds -> no dead letter
)
```

### Panic recovery

The DLQ also catches handler panics. In sync and async modes alike, the panic is converted to an error, routed to the DLQ and error hooks, and execution continues. The worker pool is never killed by a misbehaving handler.

```go
disp.Register("risky", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
panic("something went wrong") // caught — not fatal
})

store.DLQ = GoEventBus.NewDeadLetterQueue()
store.Publish()

dl := store.DLQ.Entries()[0]
fmt.Println(dl.Err) // "handler panic: something went wrong"
```

---

## Transactions

Group multiple events into a single commit. If any handler returns an error, `Commit` stops and returns that error. Call `Rollback` to discard buffered events without publishing.
Expand Down Expand Up @@ -351,6 +435,13 @@ func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPoli

Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two.

**Fields**

| Field | Type | Description |
|---|---|---|
| `Async` | `bool` | Enable async worker pool dispatch |
| `DLQ` | `*DeadLetterQueue` | Optional dead letter queue; `nil` by default |

#### Methods

| Method | Description |
Expand All @@ -371,6 +462,32 @@ Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two.

Returns a `*Transaction` scoped to this store.

### `DeadLetter`

```go
type DeadLetter struct {
Event Event
Err error // handler error, or a wrapped panic value
FailedAt time.Time
Attempts int // 1 on first failure; incremented on each Replay call
}
```

### `DeadLetterQueue`

```go
func NewDeadLetterQueue() *DeadLetterQueue
```

| Method | Description |
|---|---|
| `Len() int` | Number of entries currently in the queue |
| `Entries() []DeadLetter` | Snapshot copy — safe to iterate without holding a lock |
| `Drain() []DeadLetter` | Remove and return all entries, leaving the queue empty |
| `Replay(ctx, store) error` | Re-subscribe all entries, call `Publish`, keep failures in queue |

Attach to a store via `store.DLQ = GoEventBus.NewDeadLetterQueue()`. When `DLQ` is `nil` (default) behaviour is unchanged.

### `Transaction`

| Method | Description |
Expand Down
96 changes: 96 additions & 0 deletions dlq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package GoEventBus

import (
"context"
"sync"
"time"
)

// DeadLetter holds a failed event and the reason it could not be processed.
type DeadLetter struct {
Event Event
Err error // handler error, or a wrapped panic value
FailedAt time.Time
Attempts int // 1 on first failure; incremented on each Replay call
}

// DeadLetterQueue is a thread-safe store for events that failed during dispatch.
// Attach one to an EventStore via store.DLQ to enable dead-letter routing.
//
// store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
// store.DLQ = GoEventBus.NewDeadLetterQueue()
type DeadLetterQueue struct {
mu sync.Mutex
entries []DeadLetter
}

// NewDeadLetterQueue returns an empty DeadLetterQueue ready for use.
func NewDeadLetterQueue() *DeadLetterQueue {
return &DeadLetterQueue{}
}

func (q *DeadLetterQueue) add(dl DeadLetter) {
q.mu.Lock()
q.entries = append(q.entries, dl)
q.mu.Unlock()
}

// Len returns the number of dead letters currently in the queue.
func (q *DeadLetterQueue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.entries)
}

// Entries returns a snapshot copy of all dead letters without removing them.
func (q *DeadLetterQueue) Entries() []DeadLetter {
q.mu.Lock()
defer q.mu.Unlock()
out := make([]DeadLetter, len(q.entries))
copy(out, q.entries)
return out
}

// Drain removes and returns all dead letters, leaving the queue empty.
func (q *DeadLetterQueue) Drain() []DeadLetter {
q.mu.Lock()
defer q.mu.Unlock()
out := q.entries
q.entries = nil
return out
}

// Replay re-enqueues all dead letters into store for reprocessing and calls
// store.Publish() once after. Each entry has its Attempts incremented before
// re-subscribing. Entries that fail to re-enqueue (e.g. buffer full) are kept
// in the queue. Returns the first Subscribe error encountered, or nil.
func (q *DeadLetterQueue) Replay(ctx context.Context, store *EventStore) error {
q.mu.Lock()
entries := q.entries
q.entries = nil
q.mu.Unlock()

var failed []DeadLetter
var firstErr error
for _, dl := range entries {
dl.Attempts++
if err := store.Subscribe(ctx, dl.Event); err != nil {
if firstErr == nil {
firstErr = err
}
dl.Err = err
failed = append(failed, dl)
}
}

if len(failed) > 0 {
q.mu.Lock()
q.entries = append(failed, q.entries...)
q.mu.Unlock()
}

if firstErr == nil {
store.Publish()
}
return firstErr
}
Loading
Loading