@@ -2,12 +2,14 @@ package services
22
33import (
44 "context"
5+ "encoding/base64"
56 "fmt"
67 "strings"
78 "time"
89
910 "github.com/davecgh/go-spew/spew"
1011 "github.com/nyaruka/phonenumbers"
12+ "golang.org/x/sync/errgroup"
1113
1214 "github.com/NdoleStudio/httpsms/pkg/events"
1315 "github.com/NdoleStudio/httpsms/pkg/repositories"
@@ -29,11 +31,13 @@ type ServiceAttachment struct {
2931// MessageService is handles message requests
3032type MessageService struct {
3133 service
32- logger telemetry.Logger
33- tracer telemetry.Tracer
34- eventDispatcher * EventDispatcher
35- phoneService * PhoneService
36- repository repositories.MessageRepository
34+ logger telemetry.Logger
35+ tracer telemetry.Tracer
36+ eventDispatcher * EventDispatcher
37+ phoneService * PhoneService
38+ repository repositories.MessageRepository
39+ attachmentStorage repositories.AttachmentStorage
40+ apiBaseURL string
3741}
3842
3943// NewMessageService creates a new MessageService
@@ -43,13 +47,17 @@ func NewMessageService(
4347 repository repositories.MessageRepository ,
4448 eventDispatcher * EventDispatcher ,
4549 phoneService * PhoneService ,
50+ attachmentStorage repositories.AttachmentStorage ,
51+ apiBaseURL string ,
4652) (s * MessageService ) {
4753 return & MessageService {
48- logger : logger .WithService (fmt .Sprintf ("%T" , s )),
49- tracer : tracer ,
50- repository : repository ,
51- phoneService : phoneService ,
52- eventDispatcher : eventDispatcher ,
54+ logger : logger .WithService (fmt .Sprintf ("%T" , s )),
55+ tracer : tracer ,
56+ repository : repository ,
57+ phoneService : phoneService ,
58+ eventDispatcher : eventDispatcher ,
59+ attachmentStorage : attachmentStorage ,
60+ apiBaseURL : apiBaseURL ,
5361 }
5462}
5563
@@ -314,15 +322,29 @@ func (service *MessageService) ReceiveMessage(ctx context.Context, params *Messa
314322
315323 ctxLogger := service .tracer .CtxLogger (service .logger , span )
316324
325+ messageID := uuid .New ()
326+ var attachmentURLs []string
327+
328+ if len (params .Attachments ) > 0 {
329+ ctxLogger .Info (fmt .Sprintf ("uploading [%d] attachments for message [%s]" , len (params .Attachments ), messageID ))
330+ var err error
331+ attachmentURLs , err = service .uploadAttachments (ctx , params .UserID , messageID , params .Attachments )
332+ if err != nil {
333+ msg := fmt .Sprintf ("cannot upload attachments for message [%s]" , messageID )
334+ return nil , service .tracer .WrapErrorSpan (span , stacktrace .Propagate (err , msg ))
335+ }
336+ }
337+
317338 eventPayload := events.MessagePhoneReceivedPayload {
318- MessageID : uuid .New (),
319- UserID : params .UserID ,
320- Encrypted : params .Encrypted ,
321- Owner : phonenumbers .Format (& params .Owner , phonenumbers .E164 ),
322- Contact : params .Contact ,
323- Timestamp : params .Timestamp ,
324- Content : params .Content ,
325- SIM : params .SIM ,
339+ MessageID : messageID ,
340+ UserID : params .UserID ,
341+ Encrypted : params .Encrypted ,
342+ Owner : phonenumbers .Format (& params .Owner , phonenumbers .E164 ),
343+ Contact : params .Contact ,
344+ Timestamp : params .Timestamp ,
345+ Content : params .Content ,
346+ SIM : params .SIM ,
347+ Attachments : attachmentURLs ,
326348 }
327349
328350 ctxLogger .Info (fmt .Sprintf ("creating cloud event for received with ID [%s]" , eventPayload .MessageID ))
@@ -568,6 +590,7 @@ func (service *MessageService) storeReceivedMessage(ctx context.Context, params
568590 UserID : params .UserID ,
569591 Contact : params .Contact ,
570592 Content : params .Content ,
593+ Attachments : params .Attachments ,
571594 SIM : params .SIM ,
572595 Encrypted : params .Encrypted ,
573596 Type : entities .MessageTypeMobileOriginated ,
@@ -588,6 +611,53 @@ func (service *MessageService) storeReceivedMessage(ctx context.Context, params
588611 return message , nil
589612}
590613
614+ func (service * MessageService ) uploadAttachments (ctx context.Context , userID entities.UserID , messageID uuid.UUID , attachments []ServiceAttachment ) ([]string , error ) {
615+ ctx , span := service .tracer .Start (ctx )
616+ defer span .End ()
617+
618+ ctxLogger := service .tracer .CtxLogger (service .logger , span )
619+
620+ g , gCtx := errgroup .WithContext (ctx )
621+ urls := make ([]string , len (attachments ))
622+ paths := make ([]string , len (attachments ))
623+
624+ for i , attachment := range attachments {
625+ i , attachment := i , attachment
626+ g .Go (func () error {
627+ decoded , err := base64 .StdEncoding .DecodeString (attachment .Content )
628+ if err != nil {
629+ return stacktrace .Propagate (err , fmt .Sprintf ("cannot decode base64 content for attachment [%d]" , i ))
630+ }
631+
632+ sanitizedName := repositories .SanitizeFilename (attachment .Name , i )
633+ ext := repositories .ExtensionFromContentType (attachment .ContentType )
634+ filename := sanitizedName + ext
635+
636+ path := fmt .Sprintf ("attachments/%s/%s/%d/%s" , userID , messageID , i , filename )
637+ paths [i ] = path
638+
639+ if err = service .attachmentStorage .Upload (gCtx , path , decoded ); err != nil {
640+ return stacktrace .Propagate (err , fmt .Sprintf ("cannot upload attachment [%d] to path [%s]" , i , path ))
641+ }
642+
643+ urls [i ] = fmt .Sprintf ("%s/v1/attachments/%s/%s/%d/%s" , service .apiBaseURL , userID , messageID , i , filename )
644+ ctxLogger .Info (fmt .Sprintf ("uploaded attachment [%d] to [%s]" , i , path ))
645+ return nil
646+ })
647+ }
648+
649+ if err := g .Wait (); err != nil {
650+ for _ , path := range paths {
651+ if path != "" {
652+ _ = service .attachmentStorage .Delete (ctx , path )
653+ }
654+ }
655+ return nil , stacktrace .Propagate (err , "cannot upload attachments" )
656+ }
657+
658+ return urls , nil
659+ }
660+
591661// HandleMessageParams are parameters for handling a message event
592662type HandleMessageParams struct {
593663 ID uuid.UUID
0 commit comments