Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ type EventStore struct {
publishedCount uint64
processedCount uint64
errorCount uint64

// txMu serialises Rollback against concurrent Subscribe calls.
txMu sync.Mutex
}

// NewEventStore initializes a new EventStore. It spins up a default worker pool.
func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStore {
if bufferSize&(bufferSize-1) != 0 {
panic("bufferSize must be a power of two")
if dispatcher == nil {
panic("GoEventBus: dispatcher must not be nil")
}
if bufferSize == 0 || bufferSize&(bufferSize-1) != 0 {
panic("GoEventBus: bufferSize must be a non-zero power of two")
}
es := &EventStore{
dispatcher: dispatcher,
Expand All @@ -122,8 +128,15 @@ func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPoli
// worker processes eventWork from the channel until shutdown.
func (es *EventStore) worker() {
for w := range es.workCh {
es.execute(w.handler, w.ev)
es.wg.Done()
func(work eventWork) {
defer func() {
if r := recover(); r != nil {
atomic.AddUint64(&es.errorCount, 1)
}
es.wg.Done()
}()
es.execute(work.handler, work.ev)
}(w)
}
}

Expand Down Expand Up @@ -216,15 +229,10 @@ func (es *EventStore) Publish() {

// execute runs the handler with middleware and hooks.
func (es *EventStore) execute(h HandlerFunc, ev Event) {
// pick up recorded context
ctx := ev.Ctx
if ctx == nil {
ctx = context.Background()
}
// override with explicit __ctx if set in legacy Args
if c, ok := ev.Args["__ctx"].(context.Context); ok && c != nil {
ctx = c
}

for _, hook := range es.beforeHooks {
hook(ctx, ev)
Expand Down
62 changes: 58 additions & 4 deletions eventstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,11 @@ func BenchmarkSubscribePublish(b *testing.B) {
b.Run("Async", func(b *testing.B) { bench(true) })
}

// TestContextPropagation verifies that a context injected through Args["__ctx"]
// TestContextPropagation verifies that a context passed to Subscribe
// is forwarded to the handler.
func TestContextPropagation(t *testing.T) {
const key, val = "myKey", "myVal"

// prepare a context with a distinctive value
ctx := context.WithValue(context.Background(), key, val)

var received string
Expand All @@ -504,8 +503,7 @@ func TestContextPropagation(t *testing.T) {
}

es := NewEventStore(&disp, 8, DropOldest)
// inject ctx via the reserved "__ctx" argument key
_ = es.Subscribe(context.Background(), Event{ID: "1", Projection: "ctx", Args: map[string]any{"__ctx": ctx}})
_ = es.Subscribe(ctx, Event{ID: "1", Projection: "ctx"})
es.Publish()

if received != val {
Expand Down Expand Up @@ -869,3 +867,59 @@ func TestSchedule_Cancel(t *testing.T) {
t.Fatal("handler fired despite cancellation")
}
}

// TestWorkerRecoverFromPanic verifies that a panicking handler does not kill the worker
// goroutine and that subsequent events are still processed.
func TestWorkerRecoverFromPanic(t *testing.T) {
var good uint32
disp := Dispatcher{
"panic": func(_ context.Context, ev Event) (Result, error) {
panic("intentional panic")
},
"ok": func(_ context.Context, ev Event) (Result, error) {
atomic.AddUint32(&good, 1)
return Result{}, nil
},
}
es := NewEventStore(&disp, 16, DropOldest)
es.Async = true

// publish a panicking event followed by a good event
_ = es.Subscribe(context.Background(), Event{Projection: "panic"})
_ = es.Subscribe(context.Background(), Event{Projection: "ok"})
es.Publish()

drainCtx, drainCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer drainCancel()
if err := es.Drain(drainCtx); err != nil {
t.Fatalf("Drain failed: %v", err)
}
if atomic.LoadUint32(&good) != 1 {
t.Fatalf("expected good handler to run once; got %d", good)
}
_, _, errs := es.Metrics()
if errs != 1 {
t.Fatalf("expected error counter == 1 (for the panic); got %d", errs)
}
}

// TestNewEventStore_NilDispatcherPanics verifies fail-fast on nil dispatcher.
func TestNewEventStore_NilDispatcherPanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic for nil dispatcher, got none")
}
}()
NewEventStore(nil, 8, DropOldest)
}

// TestNewEventStore_ZeroBufferPanics verifies fail-fast on zero buffer size.
func TestNewEventStore_ZeroBufferPanics(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("expected panic for zero buffer size, got none")
}
}()
disp := Dispatcher{}
NewEventStore(&disp, 0, DropOldest)
}
15 changes: 7 additions & 8 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ func (tx *Transaction) Commit(ctx context.Context) error {
}
ev := *evPtr
if handler, ok := disp[ev.Projection]; ok {
// pick up recorded context
cctx := ev.Ctx
if c2, ok2 := ev.Args["__ctx"].(context.Context); ok2 && c2 != nil {
cctx = c2
if cctx == nil {
cctx = ctx
}

// before hooks
Expand Down Expand Up @@ -97,18 +96,18 @@ func (tx *Transaction) Commit(ctx context.Context) error {

// Rollback clears the local buffer *and* any events that have already
// been pushed into the store’s ring-buffer since the transaction began.
// It holds txMu for the duration to prevent concurrent Subscribe calls
// from advancing head between the snapshot and the head reset.
func (tx *Transaction) Rollback() {
// clear our local buffer
tx.events = tx.events[:0]

// remove any partial enqueues from the store
tx.store.txMu.Lock()
defer tx.store.txMu.Unlock()

currHead := atomic.LoadUint64(&tx.store.head)
mask := tx.store.size - 1
for i := tx.startHead; i < currHead; i++ {
// clear slot atomically
tx.store.buf[i&mask].Store(nil)
}

// restore the head pointer
atomic.StoreUint64(&tx.store.head, tx.startHead)
}
Loading