Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 120 additions & 15 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Fan-out](#fan-out)
- [Batch Handlers](#batch-handlers)
- [Middleware](#middleware)
- [Lifecycle Hooks](#lifecycle-hooks)
- [Back-pressure Policies](#back-pressure-policies)
Expand All @@ -32,6 +33,7 @@ A high-performance, in-memory, lock-free event bus for Go — built on a cache-l

- **Lock-free ring buffer** — atomic operations with cache-line padding keep dispatch sub-microsecond
- **Fan-out dispatch** — register multiple independent handlers per projection; each runs its own middleware chain
- **Batch handlers** — collect up to N events per projection and deliver them as a slice; ideal for bulk DB writes and HTTP batching
- **Sync or async** — flip `store.Async = true` to hand work off to a fixed worker pool
- **Middleware** — wrap handlers with logging, tracing, retries, or any cross-cutting behaviour
- **Lifecycle hooks** — `OnBefore`, `OnAfter`, `OnError` for observability without touching handler logic
Expand Down Expand Up @@ -143,6 +145,103 @@ In async mode each handler invocation becomes its own work item, so the four han

---

## Batch Handlers

`RegisterBatch` collects all events for a projection that accumulate between `Publish` calls and delivers them to the handler as a slice — one call per chunk of up to `size` events. This eliminates per-event overhead for bulk operations like database inserts or HTTP batch APIs.

```go
store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)

store.RegisterBatch("metrics.recorded", 50, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
rows := make([]MetricRow, len(evs))
for i, ev := range evs {
rows[i] = ev.Data.(MetricRow)
}
return nil, db.BulkInsert(ctx, rows)
})
```

### Chunking

If more events arrive than the configured `size`, the handler is called multiple times — once per full chunk, then once for the remainder.

```go
// 7 events, size=3 → called with [3 events], [3 events], [1 event]
store.RegisterBatch("tick", 3, handler)
```

### Per-event results

Return a `[]Result` aligned with the input slice to pass per-event results to `OnAfter` hooks. A nil or shorter slice is fine — missing positions are treated as zero `Result` values.

```go
store.RegisterBatch("order.placed", 100, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
results := make([]GoEventBus.Result, len(evs))
for i, ev := range evs {
results[i] = GoEventBus.Result{Message: "processed"}
}
return results, nil
})
```

### Fan-out with batch handlers

Multiple batch handlers can be registered for the same projection. Each receives the full chunk independently.

```go
store.RegisterBatch("order.placed", 50, writeToDatabase)
store.RegisterBatch("order.placed", 50, pushToAnalytics)
```

Regular per-event handlers and batch handlers can coexist on the same projection. Both fire on each `Publish` call.

```go
disp.Register("order.placed", auditLogger) // called once per event
store.RegisterBatch("order.placed", 50, bulkWriter) // called once per chunk
```

### Error handling and the DLQ

A batch handler returns a single error for the whole chunk. On error, every event in the failing chunk is sent to the DLQ and `OnError` fires once per event. Chunks that succeeded in the same `Publish` cycle are unaffected.

```go
store.DLQ = GoEventBus.NewDeadLetterQueue()

store.RegisterBatch("write", 25, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
if err := db.BulkInsert(ctx, evs); err != nil {
return nil, err // all 25 events land in the DLQ
}
return nil, nil
})
```

### Panics

Panics in a batch handler are recovered and treated identically to a returned error — the chunk lands in the DLQ, `OnError` fires, and the worker pool continues running.

### Lifecycle hooks

`OnBefore`, `OnAfter`, and `OnError` fire **once per event** even for batch handlers, keeping observability consistent regardless of handler type. Middleware is not applied to batch handlers (the signatures are incompatible); use hooks for cross-cutting concerns instead.

### Async mode

Batch handlers participate in the async worker pool when `store.Async = true`. Each chunk is dispatched as one work item.

```go
store.Async = true

store.RegisterBatch("events", 100, bulkWriter)

for i := 0; i < 300; i++ {
_ = store.Subscribe(ctx, GoEventBus.Event{Projection: "events", Data: rows[i]})
}

store.Publish()
_ = store.Drain(context.Background()) // wait for all chunks to complete
```

---

## Middleware

Middleware wraps the handler chain and is applied in the order it is registered. Use it for logging, tracing, timeout injection, or retries.
Expand Down Expand Up @@ -174,7 +273,7 @@ Each handler in a fan-out gets its own independent copy of the middleware chain.

## Lifecycle Hooks

Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code.
Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code. For batch handlers, hooks fire once per event in the chunk rather than once per chunk.

```go
store.OnBefore(func(ctx context.Context, ev GoEventBus.Event) {
Expand Down Expand Up @@ -294,12 +393,12 @@ store.Publish()

### Fan-out and the DLQ

Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected.
Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected. The same rule applies to batch handlers: only the failing chunk's events land in the DLQ.

```go
disp.Register("order.placed",
auditLogger, // fails -> one dead letter
invoiceHandler, // succeeds -> no dead letter
auditLogger, // fails -> one dead letter per event
invoiceHandler, // succeeds -> no dead letters
)
```

Expand Down Expand Up @@ -392,9 +491,9 @@ func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc)
```go
type Event struct {
ID string
Projection interface{} // key used to look up handlers
Data any // type-safe payload (preferred)
Args map[string]any // legacy payload (deprecated)
Projection interface{} // key used to look up handlers
Data any // type-safe payload (preferred)
Args map[string]any // legacy payload (deprecated)
}
```

Expand All @@ -405,6 +504,14 @@ type HandlerFunc func(context.Context, Event) (Result, error)
type Middleware func(HandlerFunc) HandlerFunc
```

### `BatchHandlerFunc`

```go
type BatchHandlerFunc func(context.Context, []Event) ([]Result, error)
```

Receives a slice of events collected during a `Publish` cycle. Returns one `Result` per input event (may be nil or shorter — missing entries default to zero `Result`) and a single error covering the whole batch. Middleware is not applied; use lifecycle hooks for cross-cutting concerns.

### `Result`

```go
Expand Down Expand Up @@ -448,19 +555,17 @@ Panics if `dispatcher` is nil or `bufferSize` is not a non-zero power of two.
|---|---|
| `Subscribe(ctx, Event) error` | Enqueue an event; applies back-pressure per `OverrunPolicy` |
| `Publish()` | Dispatch all pending events to their handlers |
| `Use(Middleware)` | Append middleware to the chain |
| `OnBefore(BeforeHook)` | Register a hook that runs before each handler |
| `OnAfter(AfterHook)` | Register a hook that runs after each handler |
| `RegisterBatch(projection, size, BatchHandlerFunc)` | Register a batch handler; events are delivered in chunks of up to `size` |
| `Use(Middleware)` | Append middleware to the chain (applied to per-event handlers only) |
| `OnBefore(BeforeHook)` | Register a hook that runs before each handler invocation |
| `OnAfter(AfterHook)` | Register a hook that runs after each handler invocation |
| `OnError(ErrorHook)` | Register a hook that runs when a handler errors |
| `Drain(ctx) error` | Wait for all async handlers; shuts down the worker pool |
| `Close(ctx) error` | Alias for `Drain` |
| `Metrics() (published, processed, errors uint64)` | Snapshot event counters |
| `Metrics() (published, processed, errors uint64)` | Snapshot event counters; `processed` counts individual events, not batch calls |
| `Schedule(ctx, time.Time, Event) *time.Timer` | Fire an event at a time |
| `ScheduleAfter(ctx, time.Duration, Event) *time.Timer` | Fire an event after a duration |

#### `BeginTransaction() *Transaction`

Returns a `*Transaction` scoped to this store.
| `BeginTransaction() *Transaction` | Returns a `*Transaction` scoped to this store |

### `DeadLetter`

Expand Down
93 changes: 93 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package GoEventBus

import (
"context"
"fmt"
"sync/atomic"
"time"
)

// BatchHandlerFunc receives a slice of events at once. It returns one Result per
// input event (the slice may be nil or shorter than events — missing entries are
// treated as zero Results) and a single error covering the whole batch.
// Middleware is not applied to batch handlers; use lifecycle hooks for
// cross-cutting concerns.
type BatchHandlerFunc func(context.Context, []Event) ([]Result, error)

type batchEntry struct {
handler BatchHandlerFunc
size int
}

// RegisterBatch registers h as a batch handler for projection. On each Publish
// call, all pending events for projection are collected and delivered to h in
// chunks of up to size events. Multiple calls for the same projection accumulate
// handlers (fan-out); each handler receives the full chunk independently.
// size must be positive; values ≤ 0 are clamped to 1.
func (es *EventStore) RegisterBatch(projection interface{}, size int, h BatchHandlerFunc) {
if size <= 0 {
size = 1
}
es.batchHandlers[projection] = append(es.batchHandlers[projection], batchEntry{handler: h, size: size})
}

// executeBatch runs a batch handler, fires lifecycle hooks, and routes failures
// to the DLQ. It never panics — panics in the handler are recovered and treated
// as errors, mirroring the behaviour of execute for single-event handlers.
func (es *EventStore) executeBatch(h BatchHandlerFunc, events []Event) {
ctx := context.Background()
for _, ev := range events {
if ev.Ctx != nil {
ctx = ev.Ctx
break
}
}

// Recover panics so the worker pool is never killed by a misbehaving handler.
var (
results []Result
callErr error
)
func() {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(error); ok {
callErr = fmt.Errorf("handler panic: %w", e)
} else {
callErr = fmt.Errorf("handler panic: %v", r)
}
}
}()
for _, ev := range events {
for _, hook := range es.beforeHooks {
hook(ctx, ev)
}
}
results, callErr = h(ctx, events)
}()

atomic.AddUint64(&es.processedCount, uint64(len(events)))

for i, ev := range events {
var res Result
if i < len(results) {
res = results[i]
}
for _, hook := range es.afterHooks {
hook(ctx, ev, res, callErr)
}
}

if callErr != nil {
atomic.AddUint64(&es.errorCount, 1)
now := time.Now()
for _, ev := range events {
if es.DLQ != nil {
es.DLQ.add(DeadLetter{Event: ev, Err: callErr, FailedAt: now, Attempts: 1})
}
for _, hook := range es.errorHooks {
hook(ctx, ev, callErr)
}
}
}
}
Loading
Loading