Skip to content

Commit 5351ef9

Browse files
AchoArnoldCopilot
andcommitted
feat(services): add rate-based dispatch delay and ExactSendTime to SendMessage
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 4250988 commit 5351ef9

1 file changed

Lines changed: 21 additions & 13 deletions

File tree

api/pkg/services/message_service.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ type MessageSendParams struct {
461461
RequestID *string
462462
UserID entities.UserID
463463
RequestReceivedAt time.Time
464+
Index int
464465
}
465466

466467
// SendMessage a new message
@@ -470,7 +471,7 @@ func (service *MessageService) SendMessage(ctx context.Context, params MessageSe
470471

471472
ctxLogger := service.tracer.CtxLogger(service.logger, span)
472473

473-
sendAttempts, sim := service.phoneSettings(ctx, params.UserID, phonenumbers.Format(params.Owner, phonenumbers.E164))
474+
sendAttempts, sim, messagesPerMinute := service.phoneSettings(ctx, params.UserID, phonenumbers.Format(params.Owner, phonenumbers.E164))
474475

475476
eventPayload := events.MessageAPISentPayload{
476477
MessageID: uuid.New(),
@@ -484,6 +485,7 @@ func (service *MessageService) SendMessage(ctx context.Context, params MessageSe
484485
Content: params.Content,
485486
Attachments: params.Attachments,
486487
ScheduledSendTime: params.SendAt,
488+
ExactSendTime: params.SendAt != nil,
487489
SIM: sim,
488490
}
489491

@@ -500,7 +502,7 @@ func (service *MessageService) SendMessage(ctx context.Context, params MessageSe
500502
return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
501503
}
502504

503-
timeout := service.getSendDelay(ctxLogger, eventPayload, params.SendAt)
505+
timeout := service.getSendDelay(ctxLogger, eventPayload, params, messagesPerMinute)
504506
if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, timeout); err != nil {
505507
msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
506508
return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -559,18 +561,24 @@ func (service *MessageService) RegisterMissedCall(ctx context.Context, params *M
559561
return message, err
560562
}
561563

562-
func (service *MessageService) getSendDelay(ctxLogger telemetry.Logger, eventPayload events.MessageAPISentPayload, sendAt *time.Time) time.Duration {
563-
if sendAt == nil {
564-
return time.Duration(0)
564+
func (service *MessageService) getSendDelay(ctxLogger telemetry.Logger, eventPayload events.MessageAPISentPayload, params MessageSendParams, messagesPerMinute uint) time.Duration {
565+
if params.SendAt != nil {
566+
delay := params.SendAt.Sub(time.Now().UTC())
567+
if delay < 0 {
568+
ctxLogger.Info(fmt.Sprintf("message [%s] has send time [%s] in the past. sending immediately", eventPayload.MessageID, params.SendAt.String()))
569+
return time.Duration(0)
570+
}
571+
return delay
565572
}
566573

567-
delay := sendAt.Sub(time.Now().UTC())
568-
if delay < 0 {
569-
ctxLogger.Info(fmt.Sprintf("message [%s] has send time [%s] in the past. sending immediately", eventPayload.MessageID, sendAt.String()))
570-
return time.Duration(0)
574+
if params.Index > 0 && messagesPerMinute > 0 {
575+
interval := time.Minute / time.Duration(messagesPerMinute)
576+
delay := time.Duration(params.Index) * interval
577+
ctxLogger.Info(fmt.Sprintf("message [%s] bulk index [%d] rate-based delay [%s]", eventPayload.MessageID, params.Index, delay))
578+
return delay
571579
}
572580

573-
return delay
581+
return time.Duration(0)
574582
}
575583

576584
// StoreReceivedMessage a new message
@@ -1011,7 +1019,7 @@ func (service *MessageService) SearchMessages(ctx context.Context, params *Messa
10111019
return messages, nil
10121020
}
10131021

1014-
func (service *MessageService) phoneSettings(ctx context.Context, userID entities.UserID, owner string) (uint, entities.SIM) {
1022+
func (service *MessageService) phoneSettings(ctx context.Context, userID entities.UserID, owner string) (uint, entities.SIM, uint) {
10151023
ctx, span := service.tracer.Start(ctx)
10161024
defer span.End()
10171025

@@ -1021,10 +1029,10 @@ func (service *MessageService) phoneSettings(ctx context.Context, userID entitie
10211029
if err != nil {
10221030
msg := fmt.Sprintf("cannot load phone for userID [%s] and owner [%s]. using default max send attempt of 2", userID, owner)
10231031
ctxLogger.Error(stacktrace.Propagate(err, msg))
1024-
return 2, entities.SIM1
1032+
return 2, entities.SIM1, 0
10251033
}
10261034

1027-
return phone.MaxSendAttemptsSanitized(), phone.SIM
1035+
return phone.MaxSendAttemptsSanitized(), phone.SIM, phone.MessagesPerMinute
10281036
}
10291037

10301038
// storeSentMessage a new message

0 commit comments

Comments
 (0)