Skip to content

Commit f944650

Browse files
AchoArnoldCopilot
andcommitted
feat: replace libSQL/Turso with MongoDB Atlas for heartbeat storage
- Add MongoDB Go driver v2 repository implementations for HeartbeatRepository and HeartbeatMonitorRepository interfaces - Create mongodb.go connection helper with Atlas support and index creation - Update DI container to wire MongoDB as the hedging secondary backend - Replace 'turso' case with 'mongodb' case for standalone MongoDB usage - Update integration test docker-compose to use mongo:7 instead of sqld - Update .env.test with MongoDB connection string HEARTBEAT_DB_BACKEND options: - 'hedging': PostgreSQL primary, MongoDB secondary (fail-open writes) - 'mongodb': MongoDB only - default: PostgreSQL only (GORM) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5ffdcb0 commit f944650

9 files changed

Lines changed: 550 additions & 20 deletions

File tree

api/go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ require (
4747
github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885
4848
github.com/uptrace/uptrace-go v1.43.0
4949
github.com/xuri/excelize/v2 v2.10.1
50+
go.mongodb.org/mongo-driver/v2 v2.6.0
5051
go.opentelemetry.io/otel v1.43.0
5152
go.opentelemetry.io/otel/metric v1.43.0
5253
go.opentelemetry.io/otel/sdk v1.43.0
@@ -164,8 +165,12 @@ require (
164165
github.com/valyala/fasthttp v1.71.0 // indirect
165166
github.com/vanng822/css v1.0.1 // indirect
166167
github.com/vanng822/go-premailer v1.33.0 // indirect
168+
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
169+
github.com/xdg-go/scram v1.2.0 // indirect
170+
github.com/xdg-go/stringprep v1.0.4 // indirect
167171
github.com/xuri/efp v0.0.1 // indirect
168172
github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 // indirect
173+
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
169174
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
170175
go.opentelemetry.io/contrib v1.43.0 // indirect
171176
go.opentelemetry.io/contrib/detectors/gcp v1.43.0 // indirect

api/go.sum

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,12 @@ github.com/vanng822/css v1.0.1 h1:10yiXc4e8NI8ldU6mSrWmSWMuyWgPr9DZ63RSlsgDw8=
336336
github.com/vanng822/css v1.0.1/go.mod h1:tcnB1voG49QhCrwq1W0w5hhGasvOg+VQp9i9H1rCM1w=
337337
github.com/vanng822/go-premailer v1.33.0 h1:nglIpKn/7e3kIAwYByiH5xpauFur7RwAucqyZ59hcic=
338338
github.com/vanng822/go-premailer v1.33.0/go.mod h1:LGYI7ym6FQ7KcHN16LiQRF+tlan7qwhP1KEhpTINFpo=
339+
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
340+
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
341+
github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs=
342+
github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8=
343+
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
344+
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
339345
github.com/xuri/efp v0.0.1 h1:fws5Rv3myXyYni8uwj2qKjVaRP30PdjeYe2Y6FDsCL8=
340346
github.com/xuri/efp v0.0.1/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI=
341347
github.com/xuri/excelize/v2 v2.10.1 h1:V62UlqopMqha3kOpnlHy2CcRVw1V8E63jFoWUmMzxN0=
@@ -344,11 +350,15 @@ github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 h1:+C0TIdyyYmzadGaL/HBL
344350
github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ=
345351
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
346352
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
353+
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
354+
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
347355
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
348356
github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE=
349357
github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg=
350358
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
351359
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
360+
go.mongodb.org/mongo-driver/v2 v2.6.0 h1:b9sJOYrkmt4l8bY43ZenFBcPlhYIjaOfYHLtbB/5qi8=
361+
go.mongodb.org/mongo-driver/v2 v2.6.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
352362
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
353363
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
354364
go.opentelemetry.io/contrib v1.43.0 h1:rv+pngknCr4qpZDxSpEvEoRioutgfbkk82x6MChJQ3U=

api/pkg/di/container.go

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import (
7474
"github.com/NdoleStudio/httpsms/pkg/handlers"
7575
"github.com/NdoleStudio/httpsms/pkg/telemetry"
7676
"github.com/NdoleStudio/httpsms/pkg/validators"
77+
mongoDriver "go.mongodb.org/mongo-driver/v2/mongo"
7778
"gorm.io/driver/postgres"
7879
gormLogger "gorm.io/gorm/logger"
7980
)
@@ -84,6 +85,7 @@ type Container struct {
8485
db *gorm.DB
8586
dedicatedDB *gorm.DB
8687
tursoDB *sql.DB
88+
mongoClient *mongoDriver.Client
8789
version string
8890
app *fiber.App
8991
eventDispatcher *services.EventDispatcher
@@ -310,6 +312,23 @@ func (container *Container) TursoDB() *sql.DB {
310312
return container.tursoDB
311313
}
312314

315+
// MongoDB creates a *mongo.Client connection to MongoDB Atlas
316+
func (container *Container) MongoDB() *mongoDriver.Client {
317+
if container.mongoClient != nil {
318+
return container.mongoClient
319+
}
320+
321+
container.logger.Debug("creating MongoDB *mongo.Client connection")
322+
323+
client, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI"))
324+
if err != nil {
325+
container.logger.Fatal(err)
326+
}
327+
328+
container.mongoClient = client
329+
return container.mongoClient
330+
}
331+
313332
// HedgingFailureCounter creates an OTel counter for hedging secondary write failures
314333
func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter {
315334
meter := otel.GetMeterProvider().Meter(
@@ -922,20 +941,20 @@ func (container *Container) MessageThreadRepository() (repository repositories.M
922941
// HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository
923942
func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) {
924943
switch os.Getenv("HEARTBEAT_DB_BACKEND") {
925-
case "turso":
926-
container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository")
927-
return repositories.NewLibsqlHeartbeatMonitorRepository(
944+
case "mongodb":
945+
container.logger.Debug("creating MongoDB repositories.HeartbeatMonitorRepository")
946+
return repositories.NewMongoHeartbeatMonitorRepository(
928947
container.Logger(),
929948
container.Tracer(),
930-
container.TursoDB(),
949+
container.MongoDB(),
931950
)
932951
case "hedging":
933952
container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository")
934953
return repositories.NewHedgingHeartbeatMonitorRepository(
935954
container.Logger(),
936955
container.Tracer(),
937956
repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
938-
repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()),
957+
repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()),
939958
container.HedgingFailureCounter(),
940959
)
941960
default:
@@ -1760,20 +1779,20 @@ func (container *Container) RegisterSwaggerRoutes() {
17601779
// HeartbeatRepository registers a new instance of repositories.HeartbeatRepository
17611780
func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository {
17621781
switch os.Getenv("HEARTBEAT_DB_BACKEND") {
1763-
case "turso":
1764-
container.logger.Debug("creating libSQL repositories.HeartbeatRepository")
1765-
return repositories.NewLibsqlHeartbeatRepository(
1782+
case "mongodb":
1783+
container.logger.Debug("creating MongoDB repositories.HeartbeatRepository")
1784+
return repositories.NewMongoHeartbeatRepository(
17661785
container.Logger(),
17671786
container.Tracer(),
1768-
container.TursoDB(),
1787+
container.MongoDB(),
17691788
)
17701789
case "hedging":
17711790
container.logger.Debug("creating hedging repositories.HeartbeatRepository")
17721791
return repositories.NewHedgingHeartbeatRepository(
17731792
container.Logger(),
17741793
container.Tracer(),
17751794
repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
1776-
repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()),
1795+
repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()),
17771796
container.HedgingFailureCounter(),
17781797
)
17791798
default:
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package repositories
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"go.mongodb.org/mongo-driver/v2/bson"
10+
"go.mongodb.org/mongo-driver/v2/mongo"
11+
12+
"github.com/NdoleStudio/httpsms/pkg/entities"
13+
"github.com/NdoleStudio/httpsms/pkg/telemetry"
14+
"github.com/palantir/stacktrace"
15+
)
16+
17+
// mongoHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in MongoDB
18+
type mongoHeartbeatMonitorRepository struct {
19+
logger telemetry.Logger
20+
tracer telemetry.Tracer
21+
collection *mongo.Collection
22+
}
23+
24+
// NewMongoHeartbeatMonitorRepository creates the MongoDB version of the HeartbeatMonitorRepository
25+
func NewMongoHeartbeatMonitorRepository(
26+
logger telemetry.Logger,
27+
tracer telemetry.Tracer,
28+
client *mongo.Client,
29+
) HeartbeatMonitorRepository {
30+
return &mongoHeartbeatMonitorRepository{
31+
logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})),
32+
tracer: tracer,
33+
collection: client.Database(mongoDBName).Collection(collectionHeartbeatMonitors),
34+
}
35+
}
36+
37+
type heartbeatMonitorDocument struct {
38+
ID string `bson:"_id"`
39+
PhoneID string `bson:"phone_id"`
40+
UserID string `bson:"user_id"`
41+
QueueID string `bson:"queue_id"`
42+
Owner string `bson:"owner"`
43+
PhoneOnline bool `bson:"phone_online"`
44+
CreatedAt time.Time `bson:"created_at"`
45+
UpdatedAt time.Time `bson:"updated_at"`
46+
}
47+
48+
func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error {
49+
ctx, span := repository.tracer.Start(ctx)
50+
defer span.End()
51+
52+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
53+
defer cancel()
54+
55+
doc := heartbeatMonitorDocument{
56+
ID: monitor.ID.String(),
57+
PhoneID: monitor.PhoneID.String(),
58+
UserID: string(monitor.UserID),
59+
QueueID: monitor.QueueID,
60+
Owner: monitor.Owner,
61+
PhoneOnline: monitor.PhoneOnline,
62+
CreatedAt: monitor.CreatedAt.UTC(),
63+
UpdatedAt: monitor.UpdatedAt.UTC(),
64+
}
65+
66+
_, err := repository.collection.InsertOne(ctx, doc)
67+
if err != nil {
68+
msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID)
69+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
70+
}
71+
72+
return nil
73+
}
74+
75+
func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) {
76+
ctx, span := repository.tracer.Start(ctx)
77+
defer span.End()
78+
79+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
80+
defer cancel()
81+
82+
filter := bson.D{
83+
{"user_id", string(userID)},
84+
{"owner", phoneNumber},
85+
}
86+
87+
var doc heartbeatMonitorDocument
88+
err := repository.collection.FindOne(ctx, filter).Decode(&doc)
89+
if err == mongo.ErrNoDocuments {
90+
msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber)
91+
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
92+
}
93+
if err != nil {
94+
msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber)
95+
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
96+
}
97+
98+
monitor, err := docToHeartbeatMonitor(doc)
99+
if err != nil {
100+
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat monitor document"))
101+
}
102+
103+
return monitor, nil
104+
}
105+
106+
func (repository *mongoHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) {
107+
ctx, span := repository.tracer.Start(ctx)
108+
defer span.End()
109+
110+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
111+
defer cancel()
112+
113+
filter := bson.D{
114+
{"user_id", string(userID)},
115+
{"_id", monitorID.String()},
116+
}
117+
118+
count, err := repository.collection.CountDocuments(ctx, filter)
119+
if err != nil {
120+
msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID)
121+
return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
122+
}
123+
124+
return count > 0, nil
125+
}
126+
127+
func (repository *mongoHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error {
128+
ctx, span := repository.tracer.Start(ctx)
129+
defer span.End()
130+
131+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
132+
defer cancel()
133+
134+
filter := bson.D{{"_id", monitorID.String()}}
135+
update := bson.D{{"$set", bson.D{
136+
{"queue_id", queueID},
137+
{"updated_at", time.Now().UTC()},
138+
}}}
139+
140+
_, err := repository.collection.UpdateOne(ctx, filter, update)
141+
if err != nil {
142+
msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID)
143+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
144+
}
145+
146+
return nil
147+
}
148+
149+
func (repository *mongoHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error {
150+
ctx, span := repository.tracer.Start(ctx)
151+
defer span.End()
152+
153+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
154+
defer cancel()
155+
156+
filter := bson.D{
157+
{"user_id", string(userID)},
158+
{"owner", phoneNumber},
159+
}
160+
161+
_, err := repository.collection.DeleteMany(ctx, filter)
162+
if err != nil {
163+
msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID)
164+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
165+
}
166+
167+
return nil
168+
}
169+
170+
func (repository *mongoHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error {
171+
ctx, span := repository.tracer.Start(ctx)
172+
defer span.End()
173+
174+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
175+
defer cancel()
176+
177+
filter := bson.D{
178+
{"_id", monitorID.String()},
179+
{"user_id", string(userID)},
180+
}
181+
update := bson.D{{"$set", bson.D{
182+
{"phone_online", online},
183+
{"updated_at", time.Now().UTC()},
184+
}}}
185+
186+
_, err := repository.collection.UpdateOne(ctx, filter, update)
187+
if err != nil {
188+
msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID)
189+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
190+
}
191+
192+
return nil
193+
}
194+
195+
func (repository *mongoHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
196+
ctx, span := repository.tracer.Start(ctx)
197+
defer span.End()
198+
199+
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
200+
defer cancel()
201+
202+
_, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}})
203+
if err != nil {
204+
msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID)
205+
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
206+
}
207+
208+
return nil
209+
}
210+
211+
func docToHeartbeatMonitor(doc heartbeatMonitorDocument) (*entities.HeartbeatMonitor, error) {
212+
id, err := uuid.Parse(doc.ID)
213+
if err != nil {
214+
return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", doc.ID))
215+
}
216+
phoneID, err := uuid.Parse(doc.PhoneID)
217+
if err != nil {
218+
return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", doc.PhoneID))
219+
}
220+
return &entities.HeartbeatMonitor{
221+
ID: id,
222+
PhoneID: phoneID,
223+
UserID: entities.UserID(doc.UserID),
224+
QueueID: doc.QueueID,
225+
Owner: doc.Owner,
226+
PhoneOnline: doc.PhoneOnline,
227+
CreatedAt: doc.CreatedAt,
228+
UpdatedAt: doc.UpdatedAt,
229+
}, nil
230+
}

0 commit comments

Comments
 (0)