Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
539 changes: 283 additions & 256 deletions README.MD

Large diffs are not rendered by default.

33 changes: 21 additions & 12 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ type BeforeHook func(context.Context, Event)
type AfterHook func(context.Context, Event, Result, error)
type ErrorHook func(context.Context, Event, error)

// Dispatcher maps event projections to handler functions.
type Dispatcher map[interface{}]HandlerFunc
// Dispatcher maps event projections to one or more handler functions.
// Multiple handlers registered for the same projection are called in order (fan-out).
type Dispatcher map[interface{}][]HandlerFunc

// Register appends one or more handlers for the given projection.
// Calling Register multiple times on the same key accumulates handlers.
func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc) {
d[projection] = append(d[projection], handlers...)
}

// Event is a unit of work to be dispatched.
type Event struct {
Expand Down Expand Up @@ -211,16 +218,18 @@ func (es *EventStore) Publish() {
continue
}
ev := *p
if handler, ok := disp[ev.Projection]; ok {
if es.Async {
es.wg.Add(1)
select {
case es.workCh <- eventWork{handler, ev}:
case <-es.shutdownSignal:
es.wg.Done()
if handlers, ok := disp[ev.Projection]; ok {
for _, handler := range handlers {
if es.Async {
es.wg.Add(1)
select {
case es.workCh <- eventWork{handler, ev}:
case <-es.shutdownSignal:
es.wg.Done()
}
} else {
es.execute(handler, ev)
}
} else {
es.execute(handler, ev)
}
}
}
Expand Down Expand Up @@ -293,7 +302,7 @@ func (es *EventStore) Schedule(ctx context.Context, t time.Time, e Event) *time.
e.Ctx = ctx
// look up and run the handler directly
disp := *es.dispatcher
if handler, ok := disp[e.Projection]; ok {
for _, handler := range disp[e.Projection] {
es.execute(handler, e)
}
return nil
Expand Down
Loading
Loading