Skip to content

feat(app): plan-based Stripe invoice sync#4040

Draft
hekike wants to merge 4 commits intomainfrom
feat/stripe-invoice-sync
Draft

feat(app): plan-based Stripe invoice sync#4040
hekike wants to merge 4 commits intomainfrom
feat/stripe-invoice-sync

Conversation

@hekike
Copy link
Copy Markdown
Contributor

@hekike hekike commented Mar 30, 2026

Summary

Replaces the inline, synchronous Stripe invoice sync logic with a persistent, plan-based execution model. Sync operations are persisted as an ordered plan in the database and executed asynchronously with checkpointing, crash recovery, and idempotency.

  • Billing service extensions: Add SyncExternalIDs for incremental Stripe ID sync-back, standard invoice state helpers, noop billing service for tests, and post-commit transaction hooks
  • New invoicesync package (openmeter/app/stripe/invoicesync/): Implements the full plan lifecycle — planner generates ordered operations, executor calls Stripe API with advisory locking, handler orchestrates async execution via events, adapter persists plans/ops to Postgres via Ent
  • Integration: Wire the new package into the Stripe app, replace legacy inline sync in entity/app/invoice.go, update DI across billing-worker/server/jobs entry points

Plan lifecycle

Each sync plan maps to a billing invoice state machine phase (draft, issuing, delete). Plans transition through pending → executing → completed/failed, with individual operations tracked and checkpointed independently.

Key design decisions

  • Crash recovery: Operations are persisted before execution; on restart, the handler picks up where it left off
  • Idempotency: Advisory locks prevent parallel plan execution for the same invoice
  • Supersession: When a new sync is triggered, all active plans for that invoice are canceled before creating a new one
  • External ID sync-back: After each Stripe API call, external IDs (Stripe invoice ID, line item IDs) are synced back to the billing invoice immediately

New DB tables

  • app_stripe_invoice_sync_plans — tracks sync plan lifecycle per invoice
  • app_stripe_invoice_sync_ops — tracks individual operations within a plan

Test plan

  • Unit tests for planner, executor, handler, adapter, currency helpers, types, and events (all included)
  • E2e sync plan tests in test/app/stripe/invoicesync_test.go
  • Existing TestStripeInvoicing tests updated for the new async path
  • make test passes
  • make lint passes

🤖 Generated with Claude Code

Summary by CodeRabbit

Release Notes

  • New Features

    • Stripe invoice operations (create, update, finalize, delete) now execute asynchronously with persistent sync plans for improved reliability and error recovery.
    • Added deterministic operation sequencing and idempotent execution to prevent duplicate Stripe API calls.
    • Stripe invoice external identifiers are now synced incrementally as operations complete.
  • Bug Fixes

    • Enhanced error handling with classified retryable vs non-retryable failure paths for Stripe operations.

@hekike hekike requested a review from a team as a code owner March 30, 2026 21:00
@hekike hekike marked this pull request as draft March 30, 2026 21:00
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 30, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 2d20d21a-64fd-4408-b725-20bf4ae4fea9

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Introduces an async invoice synchronization system for Stripe that replaces synchronous invoice operations with persistent sync plans containing ordered operations executed via events. Includes adapter-based persistence, event-driven handler, locking mechanisms, result reconstruction, and billing service integration.

Changes

Cohort / File(s) Summary
Configuration & Wiring
app/common/app.go, app/common/openmeter_billingworker.go, cmd/billing-worker/wire_gen.go, cmd/jobs/internal/wire_gen.go, cmd/server/wire_gen.go
Added NewSyncPlanAdapter and NewSyncPlanService constructors; updated NewAppStripeService to accept syncPlanService dependency; wired NewBillingWorker with expanded dependencies for sync plan advisory locking and handler registration.
Core Types & Data Model
openmeter/app/stripe/invoicesync/types.go, openmeter/app/stripe/invoicesync/types_test.go
Defined sync phase/operation/status enums, persistent SyncPlan/SyncOperation structs, payload/response types for each operation, and deterministic idempotency key generation via SHA-256.
Service Contracts
openmeter/app/stripe/invoicesync/service.go, openmeter/app/stripe/invoicesync/adapter.go, openmeter/app/stripe/invoicesync/events.go, openmeter/app/stripe/invoicesync/events_test.go
Introduced Service interface for draft/issuing/delete sync plan creation and cancellation; Adapter interface for plan/operation persistence; ExecuteSyncPlanEvent for async event dispatch.
Service Implementation
openmeter/app/stripe/invoicesync/service/service.go
Implemented Service with plan generation, cancellation, transactional event publishing via transaction.OnCommit, and error handling.
Plan Generation
openmeter/app/stripe/invoicesync/planner.go, openmeter/app/stripe/invoicesync/planner_test.go
Generates deterministic, ordered sync operations for draft/issuing/delete workflows; diffs existing Stripe lines; handles discounts/tax behavior; produces idempotent payload marshaling.
Adapter Implementation
openmeter/app/stripe/invoicesync/adapter/adapter.go, openmeter/app/stripe/invoicesync/adapter/adapter_test.go
Ent-backed adapter for sync plan creation, retrieval, operation state transitions, plan lifecycle (complete/fail); includes transaction support and concurrent operation draining with deterministic sequencing.
Execution Handler
openmeter/app/stripe/invoicesync/executor.go, openmeter/app/stripe/invoicesync/executor_test.go
Executes sync operations sequentially, dispatches to operation-type handlers, classifies Stripe errors (retryable vs non-retryable), handles special tax-finalization retry, extracts/persists external IDs, and reconstructs billing results.
Event Handler
openmeter/app/stripe/invoicesync/handler.go, openmeter/app/stripe/invoicesync/handler_test.go, openmeter/app/stripe/invoicesync/handler_success_test.go
Processes ExecuteSyncPlanEvent within transactional advisory lock; detects plan supersession; executes operations; syncs external IDs; publishes follow-up events; handles plan completion/failure with billing state advancement.
Utility Functions
openmeter/app/stripe/invoicesync/currency.go, openmeter/app/stripe/invoicesync/currency_test.go
Extracted currency rounding/formatting utilities from removed StripeCalculator for use in sync operation payload construction.
Test Helpers
openmeter/app/stripe/invoicesync/app_noop_test.go, openmeter/app/stripe/invoicesync/secret_noop_test.go
No-op implementations of app.Service and secret.Service for test isolation.
Documentation
openmeter/app/stripe/invoicesync/README.md
Comprehensive guide covering sync plan lifecycle, operation generation, execution guarantees, locking, supersession, external ID sync-back, error classification, and retry behavior.
App Entity Updates
openmeter/app/stripe/entity/app/app.go
Added SyncPlanService field to App struct; validated in Validate().
Stripe Service Integration
openmeter/app/stripe/service/service.go, openmeter/app/stripe/service/factory.go
Added syncPlanService config and dependency to Service; wired into app creation.
Removed Code
openmeter/app/stripe/entity/app/calculator.go
Deleted StripeCalculator type and methods; migrated currency logic to invoicesync/currency.go.
Invoice Operations Refactoring
openmeter/app/stripe/entity/app/invoice.go
Replaced synchronous Stripe calls with async sync-plan creation; changed UpsertStandardInvoice/DeleteStandardInvoice/FinalizeStandardInvoice to return nil results after plan creation; added CanDraftSyncAdvance/CanIssuingSyncAdvance helpers.
Billing Service Interface
openmeter/billing/service.go, openmeter/billing/adapter.go
Added SyncExternalIDs and FailSyncInvoice methods to InvoiceAppService.
Billing Service Implementation
openmeter/billing/service/invoiceapp.go
Implemented external ID syncing (invoice/line/discount mappings); added plan failure handling via trigger flow; replaced state-mismatch error with structured InvoiceStateMismatchError.
Billing Adapter Implementation
openmeter/billing/adapter/invoiceapp.go
Transactional update of invoice/line/discount external invoicing app IDs with conditional skipping of missing records.
Billing Types & Errors
openmeter/billing/stdinvoice.go, openmeter/billing/errors.go, openmeter/billing/noop.go
Added SyncExternalIDsInput, FailSyncInvoiceInput types; added InvoiceStateMismatchError and detection helper; added comprehensive NoopService implementation.
State Machine Updates
openmeter/billing/service/stdinvoicestate.go
Clear stale sync completion metadata before draft/issuing operations to prevent race conditions with re-entered syncing states.
Database Schema
openmeter/ent/schema/app_stripe_invoice_sync.go, openmeter/ent/schema/billing.go
Added AppStripeInvoiceSyncPlan and AppStripeInvoiceSyncOp entities with proper indexing, uniqueness constraints, and cascade deletion; added edge from BillingInvoice.
Post-Commit Callback Framework
pkg/framework/transaction/postcommit.go, pkg/framework/transaction/transaction.go
Added OnCommit callback registration for deferred event publishing after transaction commit; integrated into Run to initialize callbacks on outermost transaction entry and execute after successful commit.
Test Infrastructure
test/app/stripe/invoice_test.go, openmeter/server/server_test.go, openmeter/testutils/pg_driver.go
Updated invoice tests to use sync plan execution helpers; replaced local NoopBillingService with shared billing.NoopService; broadened InitPostgresDB to accept testing.TB.
Repository Ignore
.gitignore
Added *.test pattern to exclude test files.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant App as Invoice<br/>Operation
  participant Service as Sync<br/>Service
  participant Adapter as Adapter<br/>(DB)
  participant Pub as Event<br/>Publisher
  participant Handler as Sync<br/>Handler
  participant Executor as Executor
  participant Stripe as Stripe<br/>API

  App->>Service: CreateDraftSyncPlan()
  Service->>Service: Generate operations
  Service->>Adapter: CreateSyncPlan()
  Adapter->>Adapter: Create plan + ops (DB)
  Service->>Pub: OnCommit(PublishEvent)
  Pub-->>Handler: ExecuteSyncPlanEvent
  
  Handler->>Adapter: GetSyncPlan()
  Handler->>Adapter: AcquireLock()
  Handler->>Executor: ExecuteNextOperation()
  Executor->>Adapter: GetNextPendingOp()
  Executor->>Stripe: StripeAPI (Create/Update/etc)
  Stripe-->>Executor: Response
  Executor->>Adapter: CompleteOperation(response)
  Handler->>Adapter: Check if done
  alt More ops remain
    Handler->>Pub: PublishEvent (next operation)
  else All done
    Handler->>Adapter: CompletePlan()
    Handler->>App: SyncDraftInvoice()
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

release-note/feature, area/apps

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.58% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(app): plan-based Stripe invoice sync' accurately and concisely describes the main change—replacing synchronous Stripe invoice operations with a persistent, plan-based async execution model.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/stripe-invoice-sync

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@hekike hekike added the release-note/feature Release note: Exciting New Features label Mar 30, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
openmeter/testutils/pg_driver.go (1)

60-65: ⚠️ Potential issue | 🟡 Minor

Update the local Postgres test example.

The inline command here still points at POSTGRES_HOST=localhost and skips -tags=dynamic, so it nudges people toward a setup that doesn’t match this repo’s test requirements.

As per coding guidelines, **/*.go: Run tests with -tags=dynamic flag (required for confluent-kafka-go) and use POSTGRES_HOST=127.0.0.1 environment variable when Postgres is needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/testutils/pg_driver.go` around lines 60 - 65, Update the inline
test example in the InitPostgresDB comment to use the repo-required test flags
and host: change the example command that currently shows
"POSTGRES_HOST=localhost go test ./internal/credit/..." to use
POSTGRES_HOST=127.0.0.1 and include the -tags=dynamic flag (e.g.,
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic ./...) so the comment in
InitPostgresDB reflects the correct way to run tests that require Postgres and
confluent-kafka-go.
🧹 Nitpick comments (6)
openmeter/app/stripe/invoicesync/app_noop_test.go (1)

12-76: Make unexpected stub calls fail fast.

Right now every method returns zero values or nil, so an accidental dependency call can slip through and make the sync tests fail much later in a less obvious spot. Returning an explicit error for methods this suite shouldn’t touch will make the tests a lot sharper.

As per coding guidelines, **/*_test.go: Make sure the tests are comprehensive and cover the changes. Keep a strong focus on unit tests and in-code integration tests.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/app_noop_test.go` around lines 12 - 76, The
noopAppService currently returns zero values and nil errors allowing accidental
calls to go unnoticed; update noopAppService so any method that tests shouldn't
invoke returns an explicit error (e.g., fmt.Errorf or errors.New) containing the
method name to fail fast — modify each method on noopAppService (e.g.,
RegisterMarketplaceListing, GetMarketplaceListing, ListMarketplaceListings,
InstallMarketplaceListingWithAPIKey, InstallMarketplaceListing,
GetMarketplaceListingOauth2InstallURL, AuthorizeMarketplaceListingOauth2Install,
CreateApp, UpdateAppStatus, GetApp, UpdateApp, ListApps, UninstallApp,
ListCustomerData, EnsureCustomer, DeleteCustomer) to return a clear "unexpected
call to <MethodName>" error instead of nil; keep only those specific stubs that
tests legitimately use returning zero values if needed.
openmeter/billing/service/stdinvoicestate.go (2)

717-721: Same hardcoded string concern applies here.

Similar to the draft sync metadata clearing, consider using the invoicesync constants for the issuing sync keys as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/billing/service/stdinvoicestate.go` around lines 717 - 721, Replace
the hardcoded metadata keys used when clearing issuing sync metadata with the
corresponding constants from the invoicesync package instead of string literals;
specifically, update the block that checks m.Invoice.Metadata and deletes
"openmeter.io/stripe/issuing-sync-completed-at" and
"openmeter.io/stripe/issuing-sync-plan-id" to use the invoicesync constants
(e.g., invoicesync.IssuingSyncCompletedAtKey and
invoicesync.IssuingSyncPlanIDKey or whatever the exact names are) so the deletes
reference the centralized keys rather than duplicated strings.

682-689: Import the metadata key constants from invoicesync instead of duplicating hardcoded strings.

The invoicesync package already exports these constants in openmeter/app/stripe/invoicesync/types.go, and other parts of the codebase use them (like openmeter/app/stripe/invoicesync/handler.go and openmeter/app/stripe/entity/app/invoice.go). Using the constants here too keeps things in sync—if those keys ever change, you won't need to hunt through this file to update hardcoded strings.

♻️ Suggested refactor
+import "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
+
 // Clear any previous sync completion metadata so canDraftSyncAdvance returns false
 // until the new sync plan completes.
 if m.Invoice.Metadata != nil {
-    delete(m.Invoice.Metadata, "openmeter.io/stripe/draft-sync-completed-at")
-    delete(m.Invoice.Metadata, "openmeter.io/stripe/draft-sync-plan-id")
+    delete(m.Invoice.Metadata, invoicesync.MetadataKeyDraftSyncCompletedAt)
+    delete(m.Invoice.Metadata, invoicesync.MetadataKeyDraftSyncPlanID)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/billing/service/stdinvoicestate.go` around lines 682 - 689, Replace
the hardcoded metadata keys with the exported constants from the invoicesync
package: use invoicesync.MetadataKeyDraftSyncCompletedAt and
invoicesync.MetadataKeyDraftSyncPlanID instead of the literal strings; add an
import for the invoicesync package and update the delete calls on
m.Invoice.Metadata to reference those constants (keep the existing nil-check).
This ensures the keys remain consistent with invoicesync's definitions.
openmeter/app/stripe/invoicesync/types_test.go (1)

48-50: Validate the encoding, not just the digest length.

Line 50 only proves the key is 64 characters long, so a non-hex string of that length still passes. Adding an actual hex assertion would make this test enforce the SHA-256 contract it names.

As per coding guidelines, "**/*_test.go: Make sure the tests are comprehensive and cover the changes."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/types_test.go` around lines 48 - 50, The
test only checks length but not that GenerateIdempotencyKey returns a
hex-encoded SHA-256 string; update the test for the case in t.Run("key is hex
encoded sha256") to assert the key is valid hex (e.g. use
encoding/hex.DecodeString or a regexp like ^[0-9a-f]{64}$) in addition to
require.Len, referencing GenerateIdempotencyKey and OpTypeInvoiceCreate so the
test enforces the SHA-256 hex contract.
openmeter/app/stripe/invoicesync/handler_test.go (1)

16-23: Expand this to one case per required HandlerConfig field.

An empty config only proves the constructor rejects a missing Adapter. The rest of the required fields can regress one by one without this test noticing, so the test name currently over-promises a bit.

As per coding guidelines, "**/*_test.go: Make sure the tests are comprehensive and cover the changes."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/handler_test.go` around lines 16 - 23, The
current TestNewHandler_AllFieldsRequired only verifies that
NewHandler(HandlerConfig{}) fails for a missing Adapter; expand it into one test
per required HandlerConfig field by creating a table-driven or subtest loop in
TestNewHandler_AllFieldsRequired that constructs configs with exactly one
required field set and the rest nil, calls NewHandler, and asserts an error
contains the expected message for that missing field (referencing NewHandler and
HandlerConfig); ensure you include checks for Adapter, Logger, DB/Store
(whichever other required symbols exist in HandlerConfig) and assert the error
string mentions each specific required field (e.g., "adapter is required",
"logger is required", etc.).
openmeter/app/stripe/invoicesync/planner.go (1)

250-281: The "deterministic" ordering still has a couple random edges.

amountDiscountsById is iterated as a map, updateLines never gets sorted, and the add-path sort key is only Description. Two discounts or lines with the same description can still flip order between runs, which makes the marshaled payloads flaky. Sorting by a stable composite key like om_line_type + om_line_id would make the plan reproducible.

Also applies to: 337-361, 404-407

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/planner.go` around lines 250 - 281,
amountDiscountsById is iterated in non-deterministic order and
updateLines/addLines are not deterministically sorted (add-path only sorts by
Description), causing flaky payload order; before iterating amountDiscountsById
sort that slice by a stable composite key (e.g., discount.OmLineType + "|" +
discount.OmLineID) and after building updateLines and addLines sort those slices
by the same composite key so both update and add operations have a reproducible
order; apply the same deterministic sorting approach in the other affected
blocks referenced (around the logic that builds add/update lines in the same
file, e.g., the areas noted at 337-361 and 404-407) and ensure you use the
unique symbols amountDiscountsById, updateLines, addLines, toDiscountAddParams
and toDiscountUpdateParams to locate and change the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@openmeter/app/stripe/entity/app/invoice.go`:
- Around line 67-76: The code currently blocks sync-plan creation on a live
Stripe read by calling getStripeClient and stripeClient.ListInvoiceLineItems
when invoice.ExternalIDs.Invoicing is present; remove that synchronous remote
call from the transactional/create path in the function that builds the sync
plan (the block using getStripeClient and ListInvoiceLineItems), and instead
record the Stripe ID or a "needs-remote-line-sync" flag in the created plan so
fetching invoice lines is deferred to plan execution (or an async worker) where
you can call getStripeClient and ListInvoiceLineItems; ensure the created sync
plan no longer waits on Stripe and that execution time uses the stored
invoice.ExternalIDs.Invoicing to retrieve line items.

In `@openmeter/app/stripe/invoicesync/adapter/adapter_test.go`:
- Around line 495-500: The test builds a 64KiB blob but only serializes the
first 32 bytes into StripeInvoiceID, so the DB never sees a large payload;
update the test to serialize the full blob into the payload passed to
CreateSyncPlan (or construct a json.RawMessage from the full large byte slice)
so the created RawMessage/InvoiceUpdatePayload contains the entire 64KiB data;
specifically change the construction around invoicesync.InvoiceUpdatePayload /
rawPayload to use string(large) or json.RawMessage(large) (and ensure the
rawPayload variable used by CreateSyncPlan is the full-blob RawMessage) so the
test actually writes a large payload.

In `@openmeter/app/stripe/invoicesync/currency.go`:
- Around line 28-45: FormatAmount currently formats the raw decimal which can
drift from what Stripe actually charges; instead, call the existing rounding
helper (RoundToAmount) to convert the incoming alpacadecimal.Decimal into the
rounded minor-unit amount used for billing, then feed that rounded value into
the integer/minor-unit formatting path (the same path used when
amount.IsInteger()) via the currency calculator from
currencyx.Code(currency).Calculator() so display strings always match billed
subunits; update the non-integer branch in FormatAmount to use RoundToAmount and
then FormatAmount on the resulting minor-unit value.

In `@openmeter/app/stripe/invoicesync/handler.go`:
- Around line 206-213: The publish calls for follow-up events are executed
before the surrounding DB transaction commits, causing race conditions with
advisory locks; wrap both branches that call h.publisher.Publish (the one when
!result.Done and the delete branch around lines noted) in transaction.OnCommit
(or equivalent tx.OnCommit) so the Publish is invoked only after successful
commit, i.e., enqueue a closure that calls h.publisher.Publish(ctx,
ExecuteSyncPlanEvent{...}) inside transaction.OnCommit rather than calling
Publish directly inside the transaction.

In `@openmeter/app/stripe/invoicesync/planner_test.go`:
- Around line 582-587: The test only asserts tax fields for regularLine but
misses the discount line; update the test to also assert on discountLine (the
variable representing the discount invoice line) by checking
discountLine.TaxBehavior is non-nil and equals "exclusive" and
discountLine.TaxCode is non-nil and equals "txcd_10000000" so the discount tax
propagation is covered alongside the regular line assertions.

In `@openmeter/app/stripe/invoicesync/README.md`:
- Line 7: Update each bare fenced code block (``` ) in the README.md to include
a language identifier (use "text") so they read ```text; specifically change all
occurrences of bare fences that render the ASCII diagrams/snippets to fenced
blocks with the "text" language to satisfy markdownlint MD040 and keep make lint
green.

In `@openmeter/app/stripe/invoicesync/service/service.go`:
- Around line 56-97: The cancelAllActivePlans call in CreateDraftSyncPlan,
CreateIssuingSyncPlan and CreateDeleteSyncPlan can race with Handler.Handle
because it runs without the same invoice-scoped lock; change each Create* method
to acquire the same invoice-scoped lock used by Handler.Handle (wrap
cancelAllActivePlans, the Generate* call and the subsequent createAndPublish
within that lock), hold the lock while checking ExternalIDs for deletes so the
existence check and plan creation are atomic, and ensure proper defer/unlock to
avoid deadlocks.

In `@openmeter/billing/adapter/invoiceapp.go`:
- Around line 35-59: The loops update child rows by ID only, which can mutate
rows belonging to other invoices; restrict updates to children owned by the
invoice in in.Invoice. For BillingStandardInvoiceDetailedLine and
BillingInvoiceLineDiscount (the UpdateOneID calls on
tx.db.BillingStandardInvoiceDetailedLine and tx.db.BillingInvoiceLineDiscount),
change the update builder to include a predicate that the child’s parent invoice
equals in.Invoice (e.g. use a Where/Has... predicate or an Update().Where(... ID
== lineID AND invoice_id == in.Invoice) variant) so that if the ID does not
belong to the invoice the operation becomes a no-op/skip rather than updating
another invoice’s row; keep the existing NotFound handling for skipped rows.

In `@openmeter/ent/schema/app_stripe_invoice_sync.go`:
- Around line 46-52: Add validation to the enum-backed string fields so bad
values fail fast: for the fields defined as
field.String("phase").GoType(invoicesync.SyncPlanPhase("")) and
field.String("status").GoType(invoicesync.PlanStatus("")) (and the "type" field
in the operation schema), add .Validate(...) checks that only allow the known
enum values (derived from invoicesync.SyncPlanPhase and invoicesync.PlanStatus
constants) or add a DB CHECK constraint to those columns; update the Plan schema
(phase/status) and the Operation schema ("type") accordingly so persistence will
reject invalid strings instead of allowing silent fallbacks in executor code.

In `@pkg/framework/transaction/transaction.go`:
- Around line 49-65: The post-commit callbacks are being invoked with the same
ctx that still contains the transaction Driver and the post-commit queue marker,
which can cause callback code to reuse the committed tx or append to the
draining queue; fix by creating a tx-free, callback-free context before calling
runPostCommitCallbacks (e.g. add a helper clearTransactionContext(ctx) that
removes the transaction key and the post-commit-queue marker from context and
return that cleaned context), then call runPostCommitCallbacks(cleanCtx) instead
of runPostCommitCallbacks(ctx); reference initPostCommitCallbacks,
runPostCommitCallbacks, manage, and Driver to locate where to clear the context
and ensure nested OnCommit inside callbacks uses a fresh queue.

---

Outside diff comments:
In `@openmeter/testutils/pg_driver.go`:
- Around line 60-65: Update the inline test example in the InitPostgresDB
comment to use the repo-required test flags and host: change the example command
that currently shows "POSTGRES_HOST=localhost go test ./internal/credit/..." to
use POSTGRES_HOST=127.0.0.1 and include the -tags=dynamic flag (e.g.,
POSTGRES_HOST=127.0.0.1 go test -tags=dynamic ./...) so the comment in
InitPostgresDB reflects the correct way to run tests that require Postgres and
confluent-kafka-go.

---

Nitpick comments:
In `@openmeter/app/stripe/invoicesync/app_noop_test.go`:
- Around line 12-76: The noopAppService currently returns zero values and nil
errors allowing accidental calls to go unnoticed; update noopAppService so any
method that tests shouldn't invoke returns an explicit error (e.g., fmt.Errorf
or errors.New) containing the method name to fail fast — modify each method on
noopAppService (e.g., RegisterMarketplaceListing, GetMarketplaceListing,
ListMarketplaceListings, InstallMarketplaceListingWithAPIKey,
InstallMarketplaceListing, GetMarketplaceListingOauth2InstallURL,
AuthorizeMarketplaceListingOauth2Install, CreateApp, UpdateAppStatus, GetApp,
UpdateApp, ListApps, UninstallApp, ListCustomerData, EnsureCustomer,
DeleteCustomer) to return a clear "unexpected call to <MethodName>" error
instead of nil; keep only those specific stubs that tests legitimately use
returning zero values if needed.

In `@openmeter/app/stripe/invoicesync/handler_test.go`:
- Around line 16-23: The current TestNewHandler_AllFieldsRequired only verifies
that NewHandler(HandlerConfig{}) fails for a missing Adapter; expand it into one
test per required HandlerConfig field by creating a table-driven or subtest loop
in TestNewHandler_AllFieldsRequired that constructs configs with exactly one
required field set and the rest nil, calls NewHandler, and asserts an error
contains the expected message for that missing field (referencing NewHandler and
HandlerConfig); ensure you include checks for Adapter, Logger, DB/Store
(whichever other required symbols exist in HandlerConfig) and assert the error
string mentions each specific required field (e.g., "adapter is required",
"logger is required", etc.).

In `@openmeter/app/stripe/invoicesync/planner.go`:
- Around line 250-281: amountDiscountsById is iterated in non-deterministic
order and updateLines/addLines are not deterministically sorted (add-path only
sorts by Description), causing flaky payload order; before iterating
amountDiscountsById sort that slice by a stable composite key (e.g.,
discount.OmLineType + "|" + discount.OmLineID) and after building updateLines
and addLines sort those slices by the same composite key so both update and add
operations have a reproducible order; apply the same deterministic sorting
approach in the other affected blocks referenced (around the logic that builds
add/update lines in the same file, e.g., the areas noted at 337-361 and 404-407)
and ensure you use the unique symbols amountDiscountsById, updateLines,
addLines, toDiscountAddParams and toDiscountUpdateParams to locate and change
the code.

In `@openmeter/app/stripe/invoicesync/types_test.go`:
- Around line 48-50: The test only checks length but not that
GenerateIdempotencyKey returns a hex-encoded SHA-256 string; update the test for
the case in t.Run("key is hex encoded sha256") to assert the key is valid hex
(e.g. use encoding/hex.DecodeString or a regexp like ^[0-9a-f]{64}$) in addition
to require.Len, referencing GenerateIdempotencyKey and OpTypeInvoiceCreate so
the test enforces the SHA-256 hex contract.

In `@openmeter/billing/service/stdinvoicestate.go`:
- Around line 717-721: Replace the hardcoded metadata keys used when clearing
issuing sync metadata with the corresponding constants from the invoicesync
package instead of string literals; specifically, update the block that checks
m.Invoice.Metadata and deletes "openmeter.io/stripe/issuing-sync-completed-at"
and "openmeter.io/stripe/issuing-sync-plan-id" to use the invoicesync constants
(e.g., invoicesync.IssuingSyncCompletedAtKey and
invoicesync.IssuingSyncPlanIDKey or whatever the exact names are) so the deletes
reference the centralized keys rather than duplicated strings.
- Around line 682-689: Replace the hardcoded metadata keys with the exported
constants from the invoicesync package: use
invoicesync.MetadataKeyDraftSyncCompletedAt and
invoicesync.MetadataKeyDraftSyncPlanID instead of the literal strings; add an
import for the invoicesync package and update the delete calls on
m.Invoice.Metadata to reference those constants (keep the existing nil-check).
This ensures the keys remain consistent with invoicesync's definitions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: d7039ac4-27e9-4544-8e49-21fa86cc85d1

📥 Commits

Reviewing files that changed from the base of the PR and between e1aef59 and 80ffb51.

⛔ Files ignored due to path filters (35)
  • go.sum is excluded by !**/*.sum, !**/*.sum
  • openmeter/ent/db/appstripeinvoicesyncop.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncop/appstripeinvoicesyncop.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncop/where.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncop_create.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncop_delete.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncop_query.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncop_update.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan/appstripeinvoicesyncplan.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan/where.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan_create.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan_delete.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan_query.go is excluded by !**/ent/db/**
  • openmeter/ent/db/appstripeinvoicesyncplan_update.go is excluded by !**/ent/db/**
  • openmeter/ent/db/billinginvoice.go is excluded by !**/ent/db/**
  • openmeter/ent/db/billinginvoice/billinginvoice.go is excluded by !**/ent/db/**
  • openmeter/ent/db/billinginvoice/where.go is excluded by !**/ent/db/**
  • openmeter/ent/db/billinginvoice_create.go is excluded by !**/ent/db/**
  • openmeter/ent/db/billinginvoice_query.go is excluded by !**/ent/db/**
  • openmeter/ent/db/billinginvoice_update.go is excluded by !**/ent/db/**
  • openmeter/ent/db/client.go is excluded by !**/ent/db/**
  • openmeter/ent/db/cursor.go is excluded by !**/ent/db/**
  • openmeter/ent/db/ent.go is excluded by !**/ent/db/**
  • openmeter/ent/db/entmixinaccessor.go is excluded by !**/ent/db/**
  • openmeter/ent/db/expose.go is excluded by !**/ent/db/**
  • openmeter/ent/db/hook/hook.go is excluded by !**/ent/db/**
  • openmeter/ent/db/migrate/schema.go is excluded by !**/ent/db/**
  • openmeter/ent/db/mutation.go is excluded by !**/ent/db/**
  • openmeter/ent/db/paginate.go is excluded by !**/ent/db/**
  • openmeter/ent/db/predicate/predicate.go is excluded by !**/ent/db/**
  • openmeter/ent/db/runtime.go is excluded by !**/ent/db/**
  • openmeter/ent/db/setorclear.go is excluded by !**/ent/db/**
  • openmeter/ent/db/tx.go is excluded by !**/ent/db/**
  • tools/migrate/migrations/atlas.sum is excluded by !**/*.sum, !**/*.sum
📒 Files selected for processing (51)
  • .gitignore
  • app/common/app.go
  • app/common/openmeter_billingworker.go
  • cmd/billing-worker/wire_gen.go
  • cmd/jobs/internal/wire_gen.go
  • cmd/server/wire_gen.go
  • openmeter/app/stripe/entity/app/app.go
  • openmeter/app/stripe/entity/app/calculator.go
  • openmeter/app/stripe/entity/app/invoice.go
  • openmeter/app/stripe/invoicesync/README.md
  • openmeter/app/stripe/invoicesync/adapter.go
  • openmeter/app/stripe/invoicesync/adapter/adapter.go
  • openmeter/app/stripe/invoicesync/adapter/adapter_test.go
  • openmeter/app/stripe/invoicesync/app_noop_test.go
  • openmeter/app/stripe/invoicesync/currency.go
  • openmeter/app/stripe/invoicesync/currency_test.go
  • openmeter/app/stripe/invoicesync/events.go
  • openmeter/app/stripe/invoicesync/events_test.go
  • openmeter/app/stripe/invoicesync/executor.go
  • openmeter/app/stripe/invoicesync/executor_test.go
  • openmeter/app/stripe/invoicesync/handler.go
  • openmeter/app/stripe/invoicesync/handler_success_test.go
  • openmeter/app/stripe/invoicesync/handler_test.go
  • openmeter/app/stripe/invoicesync/planner.go
  • openmeter/app/stripe/invoicesync/planner_test.go
  • openmeter/app/stripe/invoicesync/secret_noop_test.go
  • openmeter/app/stripe/invoicesync/service.go
  • openmeter/app/stripe/invoicesync/service/service.go
  • openmeter/app/stripe/invoicesync/types.go
  • openmeter/app/stripe/invoicesync/types_test.go
  • openmeter/app/stripe/service/factory.go
  • openmeter/app/stripe/service/service.go
  • openmeter/billing/adapter.go
  • openmeter/billing/adapter/invoiceapp.go
  • openmeter/billing/errors.go
  • openmeter/billing/noop.go
  • openmeter/billing/service.go
  • openmeter/billing/service/invoiceapp.go
  • openmeter/billing/service/stdinvoicestate.go
  • openmeter/billing/stdinvoice.go
  • openmeter/ent/schema/app_stripe_invoice_sync.go
  • openmeter/ent/schema/billing.go
  • openmeter/server/server_test.go
  • openmeter/testutils/pg_driver.go
  • pkg/framework/transaction/postcommit.go
  • pkg/framework/transaction/transaction.go
  • test/app/stripe/invoice_test.go
  • test/app/stripe/invoicesync_test.go
  • test/app/stripe/testenv.go
  • tools/migrate/migrations/20260329152423_app_stripe_invoice_sync.down.sql
  • tools/migrate/migrations/20260329152423_app_stripe_invoice_sync.up.sql
💤 Files with no reviewable changes (1)
  • openmeter/app/stripe/entity/app/calculator.go

Comment on lines +495 to +500
// Build a payload ~64 KiB.
large := bytes.Repeat([]byte("x"), 64*1024)
rawPayload, err := json.Marshal(invoicesync.InvoiceUpdatePayload{
StripeInvoiceID: string(large[:32]),
})
require.NoError(t, err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

This test never stores a large payload.

Only large[:32] is serialized into StripeInvoiceID, so the JSON written to Postgres is tiny. The test stays green even if 64 KiB payloads are truncated. I'd build the RawMessage from the full blob before calling CreateSyncPlan.

🧪 Suggested fix
 	// Build a payload ~64 KiB.
 	large := bytes.Repeat([]byte("x"), 64*1024)
-	rawPayload, err := json.Marshal(invoicesync.InvoiceUpdatePayload{
-		StripeInvoiceID: string(large[:32]),
-	})
+	rawPayload, err := json.Marshal(map[string]string{
+		"blob": string(large),
+	})
 	require.NoError(t, err)

As per coding guidelines, "**/*_test.go: Make sure the tests are comprehensive and cover the changes. Keep a strong focus on unit tests and in-code integration tests."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/adapter/adapter_test.go` around lines 495 -
500, The test builds a 64KiB blob but only serializes the first 32 bytes into
StripeInvoiceID, so the DB never sees a large payload; update the test to
serialize the full blob into the payload passed to CreateSyncPlan (or construct
a json.RawMessage from the full large byte slice) so the created
RawMessage/InvoiceUpdatePayload contains the entire 64KiB data; specifically
change the construction around invoicesync.InvoiceUpdatePayload / rawPayload to
use string(large) or json.RawMessage(large) (and ensure the rawPayload variable
used by CreateSyncPlan is the full-blob RawMessage) so the test actually writes
a large payload.

Comment on lines +28 to +45
func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) {
calc, err := currencyx.Code(currency).Calculator()
if err != nil {
return "", fmt.Errorf("invalid currency %q: %w", currency, err)
}

if amount.IsInteger() {
return calc.Def.FormatAmount(num.MakeAmount(amount.IntPart(), 0)), nil
}

am, exact := amount.Float64()
_ = exact // precision loss possible for display-only formatting; not used for monetary calculations
decimalPlaces := uint32(0)
if exp := amount.Exponent(); exp < 0 {
decimalPlaces = uint32(-exp)
}
return calc.Def.FormatAmount(num.AmountFromFloat64(am, decimalPlaces)), nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Format from the rounded minor-unit amount.

For non-integer inputs this renders the raw decimal/exponent, while RoundToAmount rounds to Stripe subunits first. That lets the display string drift from what we actually bill — e.g. 1.235 USD can format as 1.235, but Stripe gets 124 cents, and zero-decimal currencies can show fractional amounts that Stripe can never charge. Reusing the rounded minor-unit path keeps descriptions aligned with the billed value.

💸 Suggested fix
 func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) {
 	calc, err := currencyx.Code(currency).Calculator()
 	if err != nil {
 		return "", fmt.Errorf("invalid currency %q: %w", currency, err)
 	}
-
-	if amount.IsInteger() {
-		return calc.Def.FormatAmount(num.MakeAmount(amount.IntPart(), 0)), nil
-	}
-
-	am, exact := amount.Float64()
-	_ = exact // precision loss possible for display-only formatting; not used for monetary calculations
-	decimalPlaces := uint32(0)
-	if exp := amount.Exponent(); exp < 0 {
-		decimalPlaces = uint32(-exp)
-	}
-	return calc.Def.FormatAmount(num.AmountFromFloat64(am, decimalPlaces)), nil
+	subunits := uint32(calc.Def.Subunits)
+	multiplier := alpacadecimal.NewFromInt(10).Pow(alpacadecimal.NewFromInt(int64(subunits)))
+	minorUnitAmount := amount.Mul(multiplier).Round(0).IntPart()
+
+	return calc.Def.FormatAmount(num.MakeAmount(minorUnitAmount, subunits)), nil
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) {
calc, err := currencyx.Code(currency).Calculator()
if err != nil {
return "", fmt.Errorf("invalid currency %q: %w", currency, err)
}
if amount.IsInteger() {
return calc.Def.FormatAmount(num.MakeAmount(amount.IntPart(), 0)), nil
}
am, exact := amount.Float64()
_ = exact // precision loss possible for display-only formatting; not used for monetary calculations
decimalPlaces := uint32(0)
if exp := amount.Exponent(); exp < 0 {
decimalPlaces = uint32(-exp)
}
return calc.Def.FormatAmount(num.AmountFromFloat64(am, decimalPlaces)), nil
}
func FormatAmount(amount alpacadecimal.Decimal, currency string) (string, error) {
calc, err := currencyx.Code(currency).Calculator()
if err != nil {
return "", fmt.Errorf("invalid currency %q: %w", currency, err)
}
subunits := uint32(calc.Def.Subunits)
multiplier := alpacadecimal.NewFromInt(10).Pow(alpacadecimal.NewFromInt(int64(subunits)))
minorUnitAmount := amount.Mul(multiplier).Round(0).IntPart()
return calc.Def.FormatAmount(num.MakeAmount(minorUnitAmount, subunits)), nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/currency.go` around lines 28 - 45,
FormatAmount currently formats the raw decimal which can drift from what Stripe
actually charges; instead, call the existing rounding helper (RoundToAmount) to
convert the incoming alpacadecimal.Decimal into the rounded minor-unit amount
used for billing, then feed that rounded value into the integer/minor-unit
formatting path (the same path used when amount.IsInteger()) via the currency
calculator from currencyx.Code(currency).Calculator() so display strings always
match billed subunits; update the non-integer branch in FormatAmount to use
RoundToAmount and then FormatAmount on the resulting minor-unit value.

Comment on lines +56 to +97
func (s *Service) CreateDraftSyncPlan(ctx context.Context, input invoicesync.CreateSyncPlanInput) error {
if err := s.cancelAllActivePlans(ctx, input.Invoice.Namespace, input.Invoice.ID); err != nil {
return err
}

sessionID, ops, err := invoicesync.GenerateDraftSyncPlan(input.GeneratorInput)
if err != nil {
return fmt.Errorf("generating draft sync plan: %w", err)
}

return s.createAndPublish(ctx, input, invoicesync.SyncPlanPhaseDraft, sessionID, ops)
}

func (s *Service) CreateIssuingSyncPlan(ctx context.Context, input invoicesync.CreateSyncPlanInput) error {
if err := s.cancelAllActivePlans(ctx, input.Invoice.Namespace, input.Invoice.ID); err != nil {
return err
}

sessionID, ops, err := invoicesync.GenerateIssuingSyncPlan(input.GeneratorInput)
if err != nil {
return fmt.Errorf("generating issuing sync plan: %w", err)
}

return s.createAndPublish(ctx, input, invoicesync.SyncPlanPhaseIssuing, sessionID, ops)
}

func (s *Service) CreateDeleteSyncPlan(ctx context.Context, input invoicesync.CreateSyncPlanInput) error {
if err := s.cancelAllActivePlans(ctx, input.Invoice.Namespace, input.Invoice.ID); err != nil {
return err
}

sessionID, ops, err := invoicesync.GenerateDeleteSyncPlan(input.GeneratorInput)
if err != nil {
return fmt.Errorf("generating delete sync plan: %w", err)
}

// No-op if the invoice has no Stripe external ID.
if len(ops) == 0 {
return nil
}

return s.createAndPublish(ctx, input, invoicesync.SyncPlanPhaseDelete, sessionID, ops)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Plan creation can race an in-flight Stripe call.

cancelAllActivePlans runs without the same invoice-scoped lock that Handler.Handle uses. A concurrent delete can fail the active draft plan while InvoiceCreate is already in flight, then CreateDeleteSyncPlan sees no ExternalIDs.Invoicing and returns no ops, leaving the just-created Stripe invoice with no cleanup path.

Also applies to: 117-133

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/app/stripe/invoicesync/service/service.go` around lines 56 - 97,
The cancelAllActivePlans call in CreateDraftSyncPlan, CreateIssuingSyncPlan and
CreateDeleteSyncPlan can race with Handler.Handle because it runs without the
same invoice-scoped lock; change each Create* method to acquire the same
invoice-scoped lock used by Handler.Handle (wrap cancelAllActivePlans, the
Generate* call and the subsequent createAndPublish within that lock), hold the
lock while checking ExternalIDs for deletes so the existence check and plan
creation are atomic, and ensure proper defer/unlock to avoid deadlocks.

Comment on lines +158 to +169
// Defer publish until after the outermost transaction commits.
// If we're not inside a transaction, OnCommit executes immediately.
transaction.OnCommit(ctx, func(ctx context.Context) {
if err := s.publisher.Publish(ctx, event); err != nil {
s.logger.ErrorContext(ctx, "failed to publish sync plan event; plan will be picked up by next sync",
"plan_id", plan.ID,
"invoice_id", input.Invoice.ID,
"phase", phase,
"error", err,
)
}
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

A publish miss here strands the persisted plan.

Once the plan row is committed, this callback only logs Publish failures. If the broker is temporarily unavailable and nothing else touches the invoice, the plan stays pending forever. This needs a retryable outbox or recovery path, not just a log line.

Comment on lines +35 to +59
// Update detailed line external IDs
for lineID, externalID := range in.LineExternalIDs {
_, err := tx.db.BillingStandardInvoiceDetailedLine.UpdateOneID(lineID).
SetInvoicingAppExternalID(externalID).
Save(ctx)
if err != nil {
if db.IsNotFound(err) {
// Line may not exist if invoice structure changed; skip
continue
}
return fmt.Errorf("updating line external ID [lineID=%s]: %w", lineID, err)
}
}

// Update discount external IDs
for discountID, externalID := range in.LineDiscountExternalIDs {
_, err := tx.db.BillingInvoiceLineDiscount.UpdateOneID(discountID).
SetInvoicingAppExternalID(externalID).
Save(ctx)
if err != nil {
if db.IsNotFound(err) {
continue
}
return fmt.Errorf("updating discount external ID [discountID=%s]: %w", discountID, err)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Scope child-row writes to the invoice being synced.

UpdateOneID(lineID) and UpdateOneID(discountID) trust the incoming IDs completely. If a stale or mismatched SyncExternalIDsInput ever carries a child ID from another invoice, this will mutate that other row instead of turning into a harmless skip. Please tie both updates back to in.Invoice ownership before saving.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@openmeter/billing/adapter/invoiceapp.go` around lines 35 - 59, The loops
update child rows by ID only, which can mutate rows belonging to other invoices;
restrict updates to children owned by the invoice in in.Invoice. For
BillingStandardInvoiceDetailedLine and BillingInvoiceLineDiscount (the
UpdateOneID calls on tx.db.BillingStandardInvoiceDetailedLine and
tx.db.BillingInvoiceLineDiscount), change the update builder to include a
predicate that the child’s parent invoice equals in.Invoice (e.g. use a
Where/Has... predicate or an Update().Where(... ID == lineID AND invoice_id ==
in.Invoice) variant) so that if the ID does not belong to the invoice the
operation becomes a no-op/skip rather than updating another invoice’s row; keep
the existing NotFound handling for skipped rows.

Comment on lines +49 to +65
// Initialize post-commit callback collection for the outermost transaction
if isOutermost {
ctx = initPostCommitCallbacks(ctx)
}

// Execute the callback and manage the transaction
return manage(ctx, tx, func(ctx context.Context, tx Driver) (R, error) {
result, err := manage(ctx, tx, func(ctx context.Context, tx Driver) (R, error) {
return cb(ctx)
})
if err != nil {
return def, err
}

// Run post-commit callbacks after the outermost transaction commits
if isOutermost {
runPostCommitCallbacks(ctx)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Run post-commit callbacks on a clean context.

ctx still carries the driver from Line 42 and the post-commit queue marker, so callback code that goes back through transaction-aware services can accidentally reuse a committed transaction. It also means a nested OnCommit inside a callback just appends to a queue that’s already being drained and never runs. Please invoke post-commit work with a tx-free, callback-free context here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/framework/transaction/transaction.go` around lines 49 - 65, The
post-commit callbacks are being invoked with the same ctx that still contains
the transaction Driver and the post-commit queue marker, which can cause
callback code to reuse the committed tx or append to the draining queue; fix by
creating a tx-free, callback-free context before calling runPostCommitCallbacks
(e.g. add a helper clearTransactionContext(ctx) that removes the transaction key
and the post-commit-queue marker from context and return that cleaned context),
then call runPostCommitCallbacks(cleanCtx) instead of
runPostCommitCallbacks(ctx); reference initPostCommitCallbacks,
runPostCommitCallbacks, manage, and Driver to locate where to clear the context
and ensure nested OnCommit inside callbacks uses a fresh queue.

hekike and others added 4 commits March 30, 2026 20:41
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note/feature Release note: Exciting New Features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant