Skip to content

fix(planetscale): prevent silent data loss on vstream context canceled errors#76

Open
dev-menon wants to merge 2 commits intoplanetscale:mainfrom
dev-menon:patch-1
Open

fix(planetscale): prevent silent data loss on vstream context canceled errors#76
dev-menon wants to merge 2 commits intoplanetscale:mainfrom
dev-menon:patch-1

Conversation

@dev-menon
Copy link
Contributor

@dev-menon dev-menon commented Mar 20, 2026

Summary

Fixes a silent data loss bug in the PlanetScale connector where records could be permanently skipped when a VStream context canceled error occurred during an incremental sync.

Root Cause

  1. The VStream protocol sends a VGTID event (cursor position update) before the row events for that transaction — a SyncResponse with a new cursor may arrive with no rows, with rows following in subsequent messages.
  2. The previous code updated the internal cursor tc immediately upon receiving any cursor-bearing response, including bare VGTID events with no rows.
  3. If a context canceled error fired after the VGTID event advanced tc but before the corresponding row events arrived, sync() returned the advanced cursor without having written those rows.
  4. Read() treated context canceled as a non-retryable error and returned immediately, passing the unsafe cursor up to sync.go.
  5. sync.go checkpointed that advanced cursor, permanently skipping the in-flight records with no error surfaced to the user.
  6. This is the exact failure mode observed in the January 2026 incident where 3 records in the billing_agreement_line table had missing columns from the destination after a sync that logged rpc error: code = Canceled desc = context canceled errors.

Changes

lib/connect_client.gosync()

Introduces lastSafeTC, a cursor that only advances when row data has been confirmed written to the destination. A bare VGTID event (cursor with no rows) does not advance lastSafeTC. On any c.Recv() error, the function now returns lastSafeTC instead of tc, ensuring the caller always receives a cursor position from which a safe re-stream is possible.

lib/connect_client.goRead()

Adds codes.Canceled to the retryable error set alongside the existing codes.DeadlineExceeded handling. Previously, context canceled was treated as a permanent, non-retryable error that returned immediately. It is now routed through the existing exponential backoff and maxConsecutiveTimeouts retry loop (up to 5 retries, backoff starting at 10s capped at 5 minutes), consistent with how server-side timeouts are handled.

Effect

On a context canceled during streaming:

  1. sync() returns lastSafeTC — the last cursor where rows were confirmed written
  2. Read() retries from that safe cursor with backoff rather than returning immediately
  3. The next sync attempt re-streams from lastSafeTC, re-delivering any rows that were in-flight during the cancellation
  4. Re-delivered rows are idempotent (Fivetran upserts), so no duplicate data is introduced

@dev-menon dev-menon marked this pull request as ready for review March 20, 2026 09:56
Copy link
Contributor

@nickvanw nickvanw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

This PR has two changes: (1) a lastSafeTC cursor in sync() that only advances when rows are present, and (2) adding codes.Canceled to the retryable error set in Read(). The stated motivation is preventing silent data loss from a VStream event ordering issue. After tracing the data flow through Vitess, the edge-gateway, and this connector, the core premise of this PR is factually incorrect, and the codes.Canceled retry change will make the connector worse in production.


The VStream event ordering claim is wrong

The PR description and code comments state:

"VStream sends the VGTID event before the corresponding row events for that transaction"

I verified this against three layers of the stack:

1. Vitess vstreamer (vstreamer.go:244-248):

// Only the following patterns are possible:
// BEGIN->ROWs or Statements->GTID->COMMIT

The GTID (which becomes VGTID at VTGate) comes after all row events, not before. This is the MySQL binlog ordering and is not configurable.

2. VTGate vstream_manager (vstream_manager.go:747-789, 1003-1062): Events are accumulated into a batch (BEGIN, FIELD, ROW, ..., then COMMIT), and the entire completed transaction is sent atomically. The GTID→VGTID conversion happens in-place within the same event array, preserving the after-rows ordering.

3. Edge-gateway psdbconnect handler (handlers.go:121-254): handleMsg iterates over ALL events in a single VStream message in one pass — ROW events accumulate into inserts/updates/deletes, and the VGTID event sets the position. After the loop completes, a single SyncResponse is sent containing both the cursor and the rows together. Within a transaction, the cursor and its rows always arrive in the same SyncResponse.

Can cursor-only SyncResponse messages happen? Yes — for heartbeats, DDL events, and transactions that touch other tables (no matching rows for the streamed table). But in those cases, the VGTID position reflects a transaction that genuinely had zero rows for this table. Resuming from that position with the old code is completely safe — there are no rows to skip.

The specific data loss scenario described — "VGTID arrives, context cancels, rows for that VGTID never delivered, cursor checkpointed past them" — cannot occur with the current implementation. The cursor and its rows are always in the same gRPC message.


The codes.Canceled retry change will cause the connector to hang on shutdown

sync() creates a child context from the parent:

ctx, cancel := context.WithTimeout(ctx, readDuration)

A codes.Canceled error from c.Recv() means the client's context was canceled. There are two cases:

  1. The child timeout fires → produces codes.DeadlineExceeded, already handled.
  2. The parent context is canceled (Fivetran terminates the sync) → produces codes.Canceled.

In case 2, retrying is futile: Read() calls sync() again, which calls context.WithTimeout(canceledParentCtx, ...), which inherits the cancellation immediately. Each retry fails instantly, but the backoff time.Sleep still runs. With 5 retries and exponential backoff (10s → 20s → 40s → 80s → 160s), the connector will hang for up to ~5 minutes after Fivetran has already asked it to stop.

The edge-gateway itself already treats context.Canceled as a clean shutdown (handlers.go:257-259):

if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
    return nil
}

So the server closes the stream cleanly. The codes.Canceled the client sees is from its own canceled context, not a transient server error.


No tests

This PR modifies data-integrity logic and adds zero tests. The existing test suite has no coverage for error/timeout paths. At minimum:

  • A test for sync() returning lastSafeTC on error after a cursor-only response
  • A test that codes.Canceled with a live parent context retries correctly
  • A test that codes.Canceled with a dead parent context exits immediately (currently broken)

What IS good

The change from return nil, ... to return lastSafeTC, ... on delete/update serialization errors (old lines 320, 333) fixes a latent nil-pointer panic. The caller in Read() immediately dereferences the returned cursor:

currentPosition, err = p.sync(...)
if currentPosition.Position != "" {  // panics if currentPosition is nil

This fix should be extracted into its own minimal PR.


Recommendation

The January 2026 incident ("3 records in billing_agreement_line had missing columns") needs its root cause re-investigated — the failure mode described in this PR cannot occur with the current VStream/edge-gateway implementation. If there was real data loss, this fix won't prevent a recurrence.

I'd suggest:

  1. Extract the nil-return fix into a separate small PR (the return nilreturn lastSafeTC changes on delete/update errors). That's a real bug worth fixing.
  2. Drop the codes.Canceled retry change — it makes shutdown behavior actively worse.
  3. If lastSafeTC is still desired as purely defensive hardening, correct the comments to describe the actual scenarios, add tests, and submit separately.

// transient connectivity errors where retrying from the last safe cursor
// is correct. Returning immediately on Canceled would checkpoint an
// advanced cursor whose rows may not have been delivered yet.
if s.Code() != codes.DeadlineExceeded && s.Code() != codes.Canceled {
Copy link
Contributor

@nickvanw nickvanw Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most concerning change in the PR. Adding codes.Canceled to the retry set will cause the connector to hang for up to ~5 minutes after Fivetran requests shutdown.

codes.Canceled from c.Recv() means the client's own context was canceled. The only way this happens (vs DeadlineExceeded) is if the parent context is canceled — i.e., the Fivetran harness is terminating the sync session.

When Read() retries, sync() creates a new child context:

ctx, cancel := context.WithTimeout(ctx, readDuration)

But ctx is the already-canceled parent context, so the new child is born canceled. Each retry fails instantly, but the backoff sleep still runs (10s, 20s, 40s, 80s, 160s = ~5 min of sleeping).

The edge-gateway already handles context.Canceled as a clean shutdown, returning nil (clean EOF) to the client. So this error code represents a legitimate shutdown signal, not a transient failure.

If this change is kept, it must check ctx.Err() != nil before entering the retry loop to avoid hanging on shutdown. But I'd argue codes.Canceled simply shouldn't be retried at all — it's not the same class of error as DeadlineExceeded.

watchForVgGtidChange := false
// lastSafeTC tracks the last cursor position where rows were confirmed written to
// the destination. VStream sends the VGTID event before the corresponding row events,
// so tc may advance to a new position before those rows are received. On a context
Copy link
Contributor

@nickvanw nickvanw Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is factually incorrect and should not be merged as-is.

VStream sends the VGTID event before the corresponding row events

The actual ordering, documented in vstreamer.go:244-248:

BEGIN -> ROWs -> GTID -> COMMIT

The VGTID comes after rows, not before. The edge-gateway's handleMsg processes all events in a single VStream message (ROWs and VGTID together) and sends one SyncResponse with both cursor and rows in the same message.

tc may advance to a new position before those rows are received

This cannot happen within a transaction. A cursor-only SyncResponse occurs when a transaction has no matching rows for this table (heartbeat, DDL, different-table transaction). In that case, there are no rows to lose — the old code was correct to advance tc.

If lastSafeTC is kept as defensive hardening, the comment should be rewritten to accurately describe the scenarios it guards against (e.g., "cursor-only responses from heartbeats or unrelated transactions"), not an event ordering that doesn't exist.

return tc, err
if errors.Is(err, io.EOF) {
// Natural end of stream: all rows up to tc have been delivered.
return tc, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EOF special-case here is correct — on clean stream termination, all rows have been delivered, so returning tc is right.

But for the non-EOF error path on the next line (return lastSafeTC, err): this is overly conservative given the actual protocol behavior. Since the edge-gateway sends cursor + rows in the same SyncResponse, tc and lastSafeTC will differ only when the most recent response was a cursor-only heartbeat/DDL/unrelated-table event — meaning there are no rows at that position to lose. Returning lastSafeTC just forces unnecessary re-streaming of events that have no rows for this table.

Not a correctness issue (re-streaming is safe), but worth understanding that this isn't preventing data loss — it's adding redundant replay.

@@ -320,7 +341,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a genuine bug fix — the old code returned nil here, and Read() immediately dereferences the result:

currentPosition, err = p.sync(...)
if currentPosition.Position != "" {  // nil pointer panic

Same for the update serialization error at line 354. This fix should be extracted and merged on its own — it doesn't depend on the lastSafeTC or codes.Canceled changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants