Skip to content

Commit c39155b

Browse files
committed
More reviewer improvments
1 parent 458616a commit c39155b

File tree

4 files changed

+121
-68
lines changed

4 files changed

+121
-68
lines changed

packages/billing/src/subscription-webhooks.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type SubscriptionStatus = (typeof schema.subscriptionStatusEnum.enumValues)[numb
2121
function mapStripeStatus(status: Stripe.Subscription.Status): SubscriptionStatus {
2222
const validStatuses: readonly string[] = schema.subscriptionStatusEnum.enumValues
2323
if (validStatuses.includes(status)) return status as SubscriptionStatus
24-
return 'active'
24+
return 'incomplete'
2525
}
2626

2727
// ---------------------------------------------------------------------------

packages/billing/src/subscription.ts

Lines changed: 81 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
eq,
1515
gt,
1616
gte,
17+
inArray,
1718
isNull,
1819
lt,
1920
or,
@@ -284,7 +285,7 @@ export async function ensureActiveBlockGrant(params: {
284285
conn: tx,
285286
})
286287

287-
if (weekly.used >= weekly.limit) {
288+
if (weekly.remaining <= 0) {
288289
trackEvent({
289290
event: AnalyticsEvent.SUBSCRIPTION_WEEKLY_LIMIT_HIT,
290291
userId,
@@ -304,7 +305,8 @@ export async function ensureActiveBlockGrant(params: {
304305
} satisfies WeeklyLimitError
305306
}
306307

307-
// 4. Create new block grant
308+
// 4. Create new block grant (capped to weekly remaining)
309+
const blockCredits = Math.min(limits.creditsPerBlock, weekly.remaining)
308310
const expiresAt = addHours(now, limits.blockDurationHours)
309311
const operationId = `block-${subscriptionId}-${now.getTime()}`
310312

@@ -315,8 +317,8 @@ export async function ensureActiveBlockGrant(params: {
315317
user_id: userId,
316318
stripe_subscription_id: subscriptionId,
317319
type: 'subscription',
318-
principal: limits.creditsPerBlock,
319-
balance: limits.creditsPerBlock,
320+
principal: blockCredits,
321+
balance: blockCredits,
320322
priority: GRANT_PRIORITIES.subscription,
321323
expires_at: expiresAt,
322324
description: `${SUBSCRIPTION_DISPLAY_NAME} block (${limits.blockDurationHours}h)`,
@@ -336,7 +338,7 @@ export async function ensureActiveBlockGrant(params: {
336338
properties: {
337339
subscriptionId,
338340
operationId,
339-
credits: limits.creditsPerBlock,
341+
credits: blockCredits,
340342
expiresAt: expiresAt.toISOString(),
341343
weeklyUsed: weekly.used,
342344
weeklyLimit: weekly.limit,
@@ -349,15 +351,15 @@ export async function ensureActiveBlockGrant(params: {
349351
userId,
350352
subscriptionId,
351353
operationId,
352-
credits: limits.creditsPerBlock,
354+
credits: blockCredits,
353355
expiresAt,
354356
},
355357
'Created new subscription block grant',
356358
)
357359

358360
return {
359361
grantId: newGrant.operation_id,
360-
credits: limits.creditsPerBlock,
362+
credits: blockCredits,
361363
expiresAt,
362364
isNew: true,
363365
} satisfies BlockGrant
@@ -414,23 +416,24 @@ export async function checkRateLimit(params: {
414416
}
415417
}
416418

417-
// Find most recent subscription block grant for this user
419+
// Find most recent active subscription block grant for this user
418420
const blocks = await db
419421
.select()
420422
.from(schema.creditLedger)
421423
.where(
422424
and(
423425
eq(schema.creditLedger.user_id, userId),
424426
eq(schema.creditLedger.type, 'subscription'),
427+
gt(schema.creditLedger.expires_at, now),
425428
),
426429
)
427430
.orderBy(desc(schema.creditLedger.created_at))
428431
.limit(1)
429432

430433
const currentBlock = blocks[0]
431434

432-
// No block yet or block expired → can start a new one
433-
if (!currentBlock || !currentBlock.expires_at || currentBlock.expires_at <= now) {
435+
// No active block → can start a new one
436+
if (!currentBlock) {
434437
return {
435438
limited: false,
436439
canStartNewBlock: true,
@@ -449,7 +452,7 @@ export async function checkRateLimit(params: {
449452
canStartNewBlock: false,
450453
blockUsed: currentBlock.principal,
451454
blockLimit: currentBlock.principal,
452-
blockResetsAt: currentBlock.expires_at,
455+
blockResetsAt: currentBlock.expires_at!,
453456
weeklyUsed: weekly.used,
454457
weeklyLimit: weekly.limit,
455458
weeklyResetsAt: weekly.resetsAt,
@@ -463,7 +466,7 @@ export async function checkRateLimit(params: {
463466
canStartNewBlock: false,
464467
blockUsed: currentBlock.principal - currentBlock.balance,
465468
blockLimit: currentBlock.principal,
466-
blockResetsAt: currentBlock.expires_at,
469+
blockResetsAt: currentBlock.expires_at!,
467470
weeklyUsed: weekly.used,
468471
weeklyLimit: weekly.limit,
469472
weeklyResetsAt: weekly.resetsAt,
@@ -523,22 +526,25 @@ export async function handleSubscribe(params: {
523526
const { userId, stripeSubscription, logger } = params
524527
const newResetDate = new Date(stripeSubscription.current_period_end * 1000)
525528

526-
await withAdvisoryLockTransaction({
529+
const { result: didMigrate } = await withAdvisoryLockTransaction({
527530
callback: async (tx) => {
528-
// Idempotency: skip if this subscription was already processed
529-
// Must be inside the lock to prevent TOCTOU races on concurrent webhooks
530-
const existing = await tx
531-
.select({ stripe_subscription_id: schema.subscription.stripe_subscription_id })
532-
.from(schema.subscription)
533-
.where(eq(schema.subscription.stripe_subscription_id, stripeSubscription.id))
531+
// Idempotency: check if credits were already migrated for this subscription.
532+
// We use the credit_ledger instead of the subscription table because
533+
// handleSubscriptionUpdated may upsert the subscription row before
534+
// invoice.paid fires, which would cause this check to skip migration.
535+
const migrationOpId = `subscribe-migrate-${stripeSubscription.id}`
536+
const existingMigration = await tx
537+
.select({ operation_id: schema.creditLedger.operation_id })
538+
.from(schema.creditLedger)
539+
.where(eq(schema.creditLedger.operation_id, migrationOpId))
534540
.limit(1)
535541

536-
if (existing.length > 0) {
542+
if (existingMigration.length > 0) {
537543
logger.info(
538544
{ userId, subscriptionId: stripeSubscription.id },
539-
'Subscription already processed — skipping handleSubscribe',
545+
'Credits already migrated — skipping handleSubscribe',
540546
)
541-
return
547+
return false
542548
}
543549

544550
// Move next_quota_reset to align with Stripe billing period
@@ -548,31 +554,41 @@ export async function handleSubscribe(params: {
548554
.where(eq(schema.user.id, userId))
549555

550556
// Migrate unused credits so nothing is lost
551-
await migrateUnusedCredits({ tx, userId, expiresAt: newResetDate, logger })
557+
await migrateUnusedCredits({
558+
tx,
559+
userId,
560+
subscriptionId: stripeSubscription.id,
561+
expiresAt: newResetDate,
562+
logger,
563+
})
564+
565+
return true
552566
},
553567
lockKey: `user:${userId}`,
554568
context: { userId, subscriptionId: stripeSubscription.id },
555569
logger,
556570
})
557571

558-
trackEvent({
559-
event: AnalyticsEvent.SUBSCRIPTION_CREATED,
560-
userId,
561-
properties: {
562-
subscriptionId: stripeSubscription.id,
563-
newResetDate: newResetDate.toISOString(),
564-
},
565-
logger,
566-
})
567-
568-
logger.info(
569-
{
572+
if (didMigrate) {
573+
trackEvent({
574+
event: AnalyticsEvent.SUBSCRIPTION_CREATED,
570575
userId,
571-
subscriptionId: stripeSubscription.id,
572-
newResetDate,
573-
},
574-
'Processed subscribe: reset date moved and credits migrated',
575-
)
576+
properties: {
577+
subscriptionId: stripeSubscription.id,
578+
newResetDate: newResetDate.toISOString(),
579+
},
580+
logger,
581+
})
582+
583+
logger.info(
584+
{
585+
userId,
586+
subscriptionId: stripeSubscription.id,
587+
newResetDate,
588+
},
589+
'Processed subscribe: reset date moved and credits migrated',
590+
)
591+
}
576592
}
577593

578594
// ---------------------------------------------------------------------------
@@ -592,20 +608,23 @@ type DbTransaction = Parameters<typeof db.transaction>[0] extends (
592608
async function migrateUnusedCredits(params: {
593609
tx: DbTransaction
594610
userId: string
611+
subscriptionId: string
595612
expiresAt: Date
596613
logger: Logger
597614
}): Promise<void> {
598-
const { tx, userId, expiresAt, logger } = params
615+
const { tx, userId, subscriptionId, expiresAt, logger } = params
599616
const now = new Date()
600617

601-
// Find all free/referral grants with remaining balance
618+
// Find all free/referral grants with remaining balance (excluding org grants)
602619
const unusedGrants = await tx
603620
.select()
604621
.from(schema.creditLedger)
605622
.where(
606623
and(
607624
eq(schema.creditLedger.user_id, userId),
608625
gt(schema.creditLedger.balance, 0),
626+
inArray(schema.creditLedger.type, ['free', 'referral']),
627+
isNull(schema.creditLedger.org_id),
609628
or(
610629
isNull(schema.creditLedger.expires_at),
611630
gt(schema.creditLedger.expires_at, now),
@@ -618,7 +637,27 @@ async function migrateUnusedCredits(params: {
618637
0,
619638
)
620639

640+
// Deterministic ID ensures idempotency — duplicate webhook deliveries
641+
// will hit onConflictDoNothing and the handleSubscribe caller checks
642+
// for this operation_id before running.
643+
const operationId = `subscribe-migrate-${subscriptionId}`
644+
621645
if (totalUnused === 0) {
646+
// Still insert the marker for idempotency so handleSubscribe's check
647+
// short-circuits on duplicate webhook deliveries.
648+
await tx
649+
.insert(schema.creditLedger)
650+
.values({
651+
operation_id: operationId,
652+
user_id: userId,
653+
type: 'free',
654+
principal: 0,
655+
balance: 0,
656+
priority: GRANT_PRIORITIES.free,
657+
expires_at: expiresAt,
658+
description: 'Migrated credits from subscription transition',
659+
})
660+
.onConflictDoNothing({ target: schema.creditLedger.operation_id })
622661
logger.debug({ userId }, 'No unused credits to migrate')
623662
return
624663
}
@@ -632,7 +671,6 @@ async function migrateUnusedCredits(params: {
632671
}
633672

634673
// Create a single migration grant preserving the total
635-
const operationId = `migration-${userId}-${crypto.randomUUID()}`
636674
await tx
637675
.insert(schema.creditLedger)
638676
.values({

packages/internal/src/db/schema.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ export const creditLedger = pgTable(
145145
.where(sql`${table.balance} != 0 AND ${table.expires_at} IS NULL`),
146146
index('idx_credit_ledger_org').on(table.org_id),
147147
index('idx_credit_ledger_subscription').on(
148-
table.stripe_subscription_id,
148+
table.user_id,
149149
table.type,
150150
table.created_at,
151151
),

0 commit comments

Comments
 (0)