Skip to content

Commit 28500a9

Browse files
committed
Fix event
1 parent 5a64c08 commit 28500a9

8 files changed

Lines changed: 175 additions & 2 deletions

File tree

api/pkg/di/container.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ func NewContainer(projectID string, version string) (container *Container) {
133133
container.RegisterUserListeners()
134134

135135
container.RegisterPhoneRoutes()
136+
container.RegisterPhoneListeners()
136137

137138
container.RegisterEventRoutes()
138139

@@ -769,6 +770,7 @@ func (container *Container) MessageSendScheduleService() *services.MessageSendSc
769770
container.Logger(),
770771
container.Tracer(),
771772
container.MessageSendScheduleRepository(),
773+
container.EventDispatcher(),
772774
)
773775
}
774776

@@ -1455,6 +1457,20 @@ func (container *Container) RegisterPhoneAPIKeyListeners() {
14551457
}
14561458
}
14571459

1460+
// RegisterPhoneListeners registers event listeners for listeners.PhoneListener
1461+
func (container *Container) RegisterPhoneListeners() {
1462+
container.logger.Debug(fmt.Sprintf("registering listeners for %T", listeners.PhoneListener{}))
1463+
_, routes := listeners.NewPhoneListener(
1464+
container.Logger(),
1465+
container.Tracer(),
1466+
container.PhoneService(),
1467+
)
1468+
1469+
for event, handler := range routes {
1470+
container.EventDispatcher().Subscribe(event, handler)
1471+
}
1472+
}
1473+
14581474
// RegisterWebsocketListeners registers event listeners for listeners.WebsocketListener
14591475
func (container *Container) RegisterWebsocketListeners() {
14601476
container.logger.Debug(fmt.Sprintf("registering listeners for %T", listeners.WebsocketListener{}))
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package events
2+
3+
import (
4+
"time"
5+
6+
"github.com/NdoleStudio/httpsms/pkg/entities"
7+
"github.com/google/uuid"
8+
)
9+
10+
// EventTypeMessageSendScheduleDeleted is emitted when a message send schedule is deleted
11+
const EventTypeMessageSendScheduleDeleted = "message-send-schedule.deleted"
12+
13+
// MessageSendScheduleDeletedPayload is the payload of the EventTypeMessageSendScheduleDeleted event
14+
type MessageSendScheduleDeletedPayload struct {
15+
ScheduleID uuid.UUID `json:"schedule_id"`
16+
UserID entities.UserID `json:"user_id"`
17+
Timestamp time.Time `json:"timestamp"`
18+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package listeners
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
cloudevents "github.com/cloudevents/sdk-go/v2"
8+
"github.com/palantir/stacktrace"
9+
10+
"github.com/NdoleStudio/httpsms/pkg/events"
11+
"github.com/NdoleStudio/httpsms/pkg/services"
12+
"github.com/NdoleStudio/httpsms/pkg/telemetry"
13+
)
14+
15+
// PhoneListener handles cloud events that alter the state of entities.Phone
16+
type PhoneListener struct {
17+
logger telemetry.Logger
18+
tracer telemetry.Tracer
19+
service *services.PhoneService
20+
}
21+
22+
// NewPhoneListener creates a new instance of PhoneListener
23+
func NewPhoneListener(
24+
logger telemetry.Logger,
25+
tracer telemetry.Tracer,
26+
service *services.PhoneService,
27+
) (l *PhoneListener, routes map[string]events.EventListener) {
28+
l = &PhoneListener{
29+
logger: logger.WithService(fmt.Sprintf("%T", l)),
30+
tracer: tracer,
31+
service: service,
32+
}
33+
34+
return l, map[string]events.EventListener{
35+
events.EventTypeMessageSendScheduleDeleted: l.onMessageSendScheduleDeleted,
36+
events.UserAccountDeleted: l.onUserAccountDeleted,
37+
}
38+
}
39+
40+
// onMessageSendScheduleDeleted handles the events.EventTypeMessageSendScheduleDeleted event
41+
func (listener *PhoneListener) onMessageSendScheduleDeleted(ctx context.Context, event cloudevents.Event) error {
42+
ctx, span := listener.tracer.Start(ctx)
43+
defer span.End()
44+
45+
var payload events.MessageSendScheduleDeletedPayload
46+
if err := event.DataAs(&payload); err != nil {
47+
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
48+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
49+
}
50+
51+
if err := listener.service.NullifyScheduleID(ctx, payload.UserID, payload.ScheduleID); err != nil {
52+
msg := fmt.Sprintf("cannot nullify schedule ID [%s] for user [%s] on [%s] event with ID [%s]", payload.ScheduleID, payload.UserID, event.Type(), event.ID())
53+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
54+
}
55+
56+
return nil
57+
}
58+
59+
// onUserAccountDeleted handles the events.UserAccountDeleted event
60+
func (listener *PhoneListener) onUserAccountDeleted(ctx context.Context, event cloudevents.Event) error {
61+
ctx, span := listener.tracer.Start(ctx)
62+
defer span.End()
63+
64+
var payload events.UserAccountDeletedPayload
65+
if err := event.DataAs(&payload); err != nil {
66+
msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
67+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
68+
}
69+
70+
if err := listener.service.DeleteAllForUser(ctx, payload.UserID); err != nil {
71+
msg := fmt.Sprintf("cannot delete all [entities.Phone] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
72+
return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
73+
}
74+
75+
return nil
76+
}

api/pkg/repositories/gorm_phone_repository.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,25 @@ func (repository *gormPhoneRepository) DeleteAllForUser(ctx context.Context, use
5050
return nil
5151
}
5252

53+
// NullifyScheduleID sets MessageSendScheduleID to NULL for all phones referencing the given schedule
54+
func (repository *gormPhoneRepository) NullifyScheduleID(ctx context.Context, userID entities.UserID, scheduleID uuid.UUID) error {
55+
ctx, span := repository.tracer.Start(ctx)
56+
defer span.End()
57+
58+
err := repository.db.WithContext(ctx).
59+
Model(&entities.Phone{}).
60+
Where("user_id = ?", userID).
61+
Where("message_send_schedule_id = ?", scheduleID).
62+
Update("message_send_schedule_id", nil).Error
63+
if err != nil {
64+
msg := fmt.Sprintf("cannot nullify message_send_schedule_id [%s] for user [%s]", scheduleID, userID)
65+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
66+
}
67+
68+
repository.cache.Clear()
69+
return nil
70+
}
71+
5372
// LoadByID loads a phone by ID
5473
func (repository *gormPhoneRepository) LoadByID(ctx context.Context, userID entities.UserID, phoneID uuid.UUID) (*entities.Phone, error) {
5574
ctx, span := repository.tracer.Start(ctx)

api/pkg/repositories/phone_repository.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ type PhoneRepository interface {
2525
// Delete an entities.Phone
2626
Delete(ctx context.Context, userID entities.UserID, phoneID uuid.UUID) error
2727

28+
// NullifyScheduleID sets MessageSendScheduleID to NULL for all phones referencing the given schedule
29+
NullifyScheduleID(ctx context.Context, userID entities.UserID, scheduleID uuid.UUID) error
30+
2831
// DeleteAllForUser deletes all entities.Phone for a user
2932
DeleteAllForUser(ctx context.Context, userID entities.UserID) error
3033
}

api/pkg/services/event_dispatcher_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (dispatcher *EventDispatcher) Publish(ctx context.Context, event cloudevent
148148

149149
dispatcher.meter.Record(
150150
ctx,
151-
float64(time.Since(start).Microseconds())/1000,
151+
float64(time.Since(start).Milliseconds()),
152152
metric.WithAttributes(
153153
semconv.CloudeventsEventType(event.Type()),
154154
semconv.CloudeventsEventSpecVersion(event.SpecVersion()),

api/pkg/services/message_send_schedule_service.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/NdoleStudio/httpsms/pkg/entities"
10+
"github.com/NdoleStudio/httpsms/pkg/events"
1011
"github.com/NdoleStudio/httpsms/pkg/repositories"
1112
"github.com/NdoleStudio/httpsms/pkg/telemetry"
1213
"github.com/google/uuid"
@@ -19,18 +20,21 @@ type MessageSendScheduleService struct {
1920
logger telemetry.Logger
2021
tracer telemetry.Tracer
2122
repository repositories.MessageSendScheduleRepository
23+
dispatcher *EventDispatcher
2224
}
2325

2426
// NewMessageSendScheduleService creates a new MessageSendScheduleService.
2527
func NewMessageSendScheduleService(
2628
logger telemetry.Logger,
2729
tracer telemetry.Tracer,
2830
repository repositories.MessageSendScheduleRepository,
31+
dispatcher *EventDispatcher,
2932
) *MessageSendScheduleService {
3033
return &MessageSendScheduleService{
3134
logger: logger.WithService(fmt.Sprintf("%T", &MessageSendScheduleService{})),
3235
tracer: tracer,
3336
repository: repository,
37+
dispatcher: dispatcher,
3438
}
3539
}
3640

@@ -137,7 +141,30 @@ func (service *MessageSendScheduleService) Delete(
137141
userID entities.UserID,
138142
scheduleID uuid.UUID,
139143
) error {
140-
return service.repository.Delete(ctx, userID, scheduleID)
144+
ctx, span := service.tracer.Start(ctx)
145+
defer span.End()
146+
147+
if err := service.repository.Delete(ctx, userID, scheduleID); err != nil {
148+
msg := fmt.Sprintf("cannot delete message send schedule with ID [%s] for user [%s]", scheduleID, userID)
149+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
150+
}
151+
152+
event, err := service.createEvent(events.EventTypeMessageSendScheduleDeleted, fmt.Sprintf("%T", service), events.MessageSendScheduleDeletedPayload{
153+
ScheduleID: scheduleID,
154+
UserID: userID,
155+
Timestamp: time.Now().UTC(),
156+
})
157+
if err != nil {
158+
msg := fmt.Sprintf("cannot create [%s] event for schedule [%s]", events.EventTypeMessageSendScheduleDeleted, scheduleID)
159+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
160+
}
161+
162+
if err = service.dispatcher.Dispatch(ctx, event); err != nil {
163+
msg := fmt.Sprintf("cannot dispatch [%s] event for schedule [%s]", event.Type(), scheduleID)
164+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
165+
}
166+
167+
return nil
141168
}
142169

143170
// sanitizeWindows normalizes and sorts schedule windows by day and start minute.

api/pkg/services/phone_service.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,20 @@ func (service *PhoneService) DeleteAllForUser(ctx context.Context, userID entiti
5656
return nil
5757
}
5858

59+
// NullifyScheduleID sets MessageSendScheduleID to NULL for all phones referencing the given schedule.
60+
func (service *PhoneService) NullifyScheduleID(ctx context.Context, userID entities.UserID, scheduleID uuid.UUID) error {
61+
ctx, span := service.tracer.Start(ctx)
62+
defer span.End()
63+
64+
if err := service.repository.NullifyScheduleID(ctx, userID, scheduleID); err != nil {
65+
msg := fmt.Sprintf("cannot nullify schedule ID [%s] for user [%s]", scheduleID, userID)
66+
return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
67+
}
68+
69+
service.tracer.CtxLogger(service.logger, span).Info(fmt.Sprintf("nullified schedule ID [%s] on phones for user [%s]", scheduleID, userID))
70+
return nil
71+
}
72+
5973
// Index fetches the heartbeats for a phone number
6074
func (service *PhoneService) Index(ctx context.Context, authUser entities.AuthContext, params repositories.IndexParams) (*[]entities.Phone, error) {
6175
ctx, span := service.tracer.Start(ctx)

0 commit comments

Comments
 (0)