Skip to content

Commit ff37607

Browse files
committed
Add retries for saving message cost to db and updating user cost. But throw error if it doesn't save
1 parent cda0250 commit ff37607

File tree

5 files changed

+121
-118
lines changed

5 files changed

+121
-118
lines changed

backend/src/__tests__/credit-conversion.test.ts

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,10 @@
1-
import { getUserCostPerCredit } from '@codebuff/billing'
21
import {
32
convertCreditsToUsdCents,
43
convertStripeGrantAmountToCredits,
54
} from '@codebuff/common/util/currency'
65
import { describe, it, expect } from 'bun:test'
76

87
describe('Credit Conversion System', () => {
9-
describe('getUserCostPerCredit', () => {
10-
it('should return 1 cent per credit for all users currently', async () => {
11-
const cost = await getUserCostPerCredit('test-user')
12-
expect(cost).toBe(1)
13-
})
14-
15-
it('should return 1 cent per credit for test user', async () => {
16-
const cost = await getUserCostPerCredit('test-user-undefined-case')
17-
expect(cost).toBe(1)
18-
})
19-
})
20-
218
describe('convertCreditsToUsdCents', () => {
229
it('should convert credits to cents correctly', () => {
2310
expect(convertCreditsToUsdCents(100, 1)).toBe(100) // 100 credits at 1¢ each

backend/src/llm-apis/message-cost-tracker.ts

Lines changed: 120 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
import {
2-
consumeCredits,
3-
consumeOrganizationCredits,
4-
getUserCostPerCredit,
5-
} from '@codebuff/billing'
1+
import { consumeCredits, consumeOrganizationCredits } from '@codebuff/billing'
62
import { trackEvent } from '@codebuff/common/analytics'
73
import { AnalyticsEvent } from '@codebuff/common/constants/analytics-events'
84
import db from '@codebuff/common/db/index'
@@ -344,9 +340,35 @@ type InsertMessageParams = {
344340
latencyMs: number
345341
}
346342

343+
export async function insertMessageRecordWithRetries(
344+
params: InsertMessageParams,
345+
maxRetries = 3,
346+
): Promise<typeof schema.message.$inferSelect> {
347+
for (let attempt = 1; attempt <= maxRetries; attempt++) {
348+
try {
349+
return await insertMessageRecord(params)
350+
} catch (error) {
351+
if (attempt === maxRetries) {
352+
logger.error(
353+
{ messageId: params.messageId, error, attempt },
354+
`Failed to save message after ${maxRetries} attempts`,
355+
)
356+
throw error
357+
} else {
358+
logger.warn(
359+
{ messageId: params.messageId, error: error },
360+
`Retrying save message to DB (attempt ${attempt}/${maxRetries})`,
361+
)
362+
await new Promise((resolve) => setTimeout(resolve, 1000 * attempt))
363+
}
364+
}
365+
}
366+
throw new Error('Failed to save message after all attempts.')
367+
}
368+
347369
async function insertMessageRecord(
348370
params: InsertMessageParams,
349-
): Promise<typeof schema.message.$inferSelect | null> {
371+
): Promise<typeof schema.message.$inferSelect> {
350372
const {
351373
messageId,
352374
userId,
@@ -371,48 +393,36 @@ async function insertMessageRecord(
371393
const orgId = requestContext?.approvedOrgIdForRepo
372394
const repoUrl = requestContext?.processedRepoUrl
373395

374-
try {
375-
const insertResult = await db
376-
.insert(schema.message)
377-
.values({
378-
...stripNullCharsFromObject({
379-
id: messageId,
380-
user_id: userId,
381-
client_id: clientSessionId,
382-
client_request_id: userInputId,
383-
model: model,
384-
request: request,
385-
response: response,
386-
}),
387-
input_tokens: inputTokens,
388-
output_tokens: outputTokens,
389-
cache_creation_input_tokens: cacheCreationInputTokens,
390-
cache_read_input_tokens: cacheReadInputTokens,
391-
cost: cost.toString(),
392-
credits: creditsUsed,
393-
finished_at: finishedAt,
394-
latency_ms: latencyMs,
395-
org_id: orgId || null,
396-
repo_url: repoUrl || null,
397-
})
398-
.returning()
396+
const insertResult = await db
397+
.insert(schema.message)
398+
.values({
399+
...stripNullCharsFromObject({
400+
id: messageId,
401+
user_id: userId,
402+
client_id: clientSessionId,
403+
client_request_id: userInputId,
404+
model: model,
405+
request: request,
406+
response: response,
407+
}),
408+
input_tokens: inputTokens,
409+
output_tokens: outputTokens,
410+
cache_creation_input_tokens: cacheCreationInputTokens,
411+
cache_read_input_tokens: cacheReadInputTokens,
412+
cost: cost.toString(),
413+
credits: creditsUsed,
414+
finished_at: finishedAt,
415+
latency_ms: latencyMs,
416+
org_id: orgId || null,
417+
repo_url: repoUrl || null,
418+
})
419+
.returning()
399420

400-
if (insertResult.length > 0) {
401-
return insertResult[0]
402-
} else {
403-
logger.error(
404-
{ messageId: messageId },
405-
'Failed to insert message into DB (no rows returned).',
406-
)
407-
return null
408-
}
409-
} catch (dbError) {
410-
logger.error(
411-
{ messageId: messageId, error: dbError },
412-
'Error saving message to DB.',
413-
)
414-
return null
421+
if (insertResult.length === 0) {
422+
throw new Error('Failed to insert message into DB (no rows returned).')
415423
}
424+
425+
return insertResult[0]
416426
}
417427

418428
async function sendCostResponseToClient(
@@ -461,6 +471,36 @@ type CreditConsumptionResult = {
461471
fromPurchased: number
462472
}
463473

474+
async function updateUserCycleUsageWithRetries(
475+
userId: string,
476+
creditsUsed: number,
477+
maxRetries = 3,
478+
): Promise<CreditConsumptionResult> {
479+
const requestContext = getRequestContext()
480+
const orgId = requestContext?.approvedOrgIdForRepo
481+
482+
for (let attempt = 1; attempt <= maxRetries; attempt++) {
483+
try {
484+
return await updateUserCycleUsage(userId, creditsUsed)
485+
} catch (error) {
486+
if (attempt === maxRetries) {
487+
logger.error(
488+
{ userId, orgId, creditsUsed, error, attempt },
489+
`Failed to update user cycle usage after ${maxRetries} attempts`,
490+
)
491+
throw error
492+
} else {
493+
logger.warn(
494+
{ userId, orgId, creditsUsed, error: error },
495+
`Retrying update user cycle usage (attempt ${attempt}/${maxRetries})`,
496+
)
497+
await new Promise((resolve) => setTimeout(resolve, 1000 * attempt))
498+
}
499+
}
500+
}
501+
throw new Error('Failed to update user cycle usage after all attempts.')
502+
}
503+
464504
async function updateUserCycleUsage(
465505
userId: string,
466506
creditsUsed: number,
@@ -479,50 +519,42 @@ async function updateUserCycleUsage(
479519
const requestContext = getRequestContext()
480520
const orgId = requestContext?.approvedOrgIdForRepo
481521

482-
try {
483-
if (orgId) {
484-
// TODO: use `consumeCreditsWithFallback` to handle organization delegation
485-
// Consume from organization credits
486-
const result = await consumeOrganizationCredits(orgId, creditsUsed)
522+
if (orgId) {
523+
// TODO: use `consumeCreditsWithFallback` to handle organization delegation
524+
// Consume from organization credits
525+
const result = await consumeOrganizationCredits(orgId, creditsUsed)
487526

488-
if (VERBOSE) {
489-
logger.debug(
490-
{ userId, orgId, creditsUsed, ...result },
491-
`Consumed organization credits (${creditsUsed})`,
492-
)
493-
}
527+
if (VERBOSE) {
528+
logger.debug(
529+
{ userId, orgId, creditsUsed, ...result },
530+
`Consumed organization credits (${creditsUsed})`,
531+
)
532+
}
494533

495-
trackEvent(AnalyticsEvent.CREDIT_CONSUMED, userId, {
496-
creditsUsed,
497-
fromPurchased: result.fromPurchased,
498-
organizationId: orgId,
499-
})
534+
trackEvent(AnalyticsEvent.CREDIT_CONSUMED, userId, {
535+
creditsUsed,
536+
fromPurchased: result.fromPurchased,
537+
organizationId: orgId,
538+
})
500539

501-
return result
502-
} else {
503-
// Consume from personal credits
504-
const result = await consumeCredits(userId, creditsUsed)
540+
return result
541+
} else {
542+
// Consume from personal credits
543+
const result = await consumeCredits(userId, creditsUsed)
505544

506-
if (VERBOSE) {
507-
logger.debug(
508-
{ userId, creditsUsed, ...result },
509-
`Consumed personal credits (${creditsUsed})`,
510-
)
511-
}
545+
if (VERBOSE) {
546+
logger.debug(
547+
{ userId, creditsUsed, ...result },
548+
`Consumed personal credits (${creditsUsed})`,
549+
)
550+
}
512551

513-
trackEvent(AnalyticsEvent.CREDIT_CONSUMED, userId, {
514-
creditsUsed,
515-
fromPurchased: result.fromPurchased,
516-
})
552+
trackEvent(AnalyticsEvent.CREDIT_CONSUMED, userId, {
553+
creditsUsed,
554+
fromPurchased: result.fromPurchased,
555+
})
517556

518-
return result
519-
}
520-
} catch (error) {
521-
logger.error(
522-
{ userId, orgId, creditsUsed, error },
523-
'Error consuming credits.',
524-
)
525-
throw error
557+
return result
526558
}
527559
}
528560

@@ -564,10 +596,7 @@ export const saveMessage = async (value: {
564596
)
565597

566598
// Default to 1 cent per credit
567-
let centsPerCredit = 1
568-
if (value.userId) {
569-
centsPerCredit = await getUserCostPerCredit(value.userId)
570-
}
599+
const centsPerCredit = 1
571600

572601
const costInCents =
573602
value.chargeUser ?? true // default to true
@@ -617,21 +646,21 @@ export const saveMessage = async (value: {
617646
value.agentId,
618647
)
619648

620-
const savedMessageResult = await insertMessageRecord({
649+
await insertMessageRecordWithRetries({
621650
...value,
622651
cost,
623652
creditsUsed,
624653
})
625654

626-
if (!savedMessageResult || !value.userId) {
655+
if (!value.userId) {
627656
logger.debug(
628657
{ messageId: value.messageId, userId: value.userId },
629658
'Skipping further processing (no user ID or failed to save message).',
630659
)
631660
return 0
632661
}
633662

634-
const consumptionResult = await updateUserCycleUsage(
663+
const consumptionResult = await updateUserCycleUsageWithRetries(
635664
value.userId,
636665
creditsUsed,
637666
)

packages/billing/src/auto-topup.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import { stripeServer } from '@codebuff/common/util/stripe'
1010
import { eq } from 'drizzle-orm'
1111

1212
import { calculateUsageAndBalance } from './balance-calculator'
13-
import { getUserCostPerCredit } from './conversion'
1413
import { processAndGrantCredit } from './grant-credits'
1514
import {
1615
calculateOrganizationUsageAndBalance,
@@ -141,7 +140,7 @@ async function processAutoTopupPayment(
141140
const idempotencyKey = `auto-topup-${userId}-${timestamp}`
142141
const operationId = idempotencyKey // Use same ID for both Stripe and our DB
143142

144-
const centsPerCredit = await getUserCostPerCredit(userId)
143+
const centsPerCredit = 1
145144
const amountInCents = convertCreditsToUsdCents(amountToTopUp, centsPerCredit)
146145

147146
if (amountInCents <= 0) {

packages/billing/src/conversion.ts

Lines changed: 0 additions & 9 deletions
This file was deleted.

packages/billing/src/index.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ export * from './balance-calculator'
77
// Credit grant operations
88
export * from './grant-credits'
99

10-
// Credit conversion utilities
11-
export * from './conversion'
12-
1310
// Organization billing
1411
export * from './org-billing'
1512

0 commit comments

Comments
 (0)