fix(planetscale): prevent silent data loss on vstream context canceled errors#76
fix(planetscale): prevent silent data loss on vstream context canceled errors#76dev-menon wants to merge 2 commits intoplanetscale:mainfrom
Conversation
nickvanw
left a comment
There was a problem hiding this comment.
Review Summary
This PR has two changes: (1) a lastSafeTC cursor in sync() that only advances when rows are present, and (2) adding codes.Canceled to the retryable error set in Read(). The stated motivation is preventing silent data loss from a VStream event ordering issue. After tracing the data flow through Vitess, the edge-gateway, and this connector, the core premise of this PR is factually incorrect, and the codes.Canceled retry change will make the connector worse in production.
The VStream event ordering claim is wrong
The PR description and code comments state:
"VStream sends the VGTID event before the corresponding row events for that transaction"
I verified this against three layers of the stack:
1. Vitess vstreamer (vstreamer.go:244-248):
// Only the following patterns are possible:
// BEGIN->ROWs or Statements->GTID->COMMIT
The GTID (which becomes VGTID at VTGate) comes after all row events, not before. This is the MySQL binlog ordering and is not configurable.
2. VTGate vstream_manager (vstream_manager.go:747-789, 1003-1062): Events are accumulated into a batch (BEGIN, FIELD, ROW, ..., then COMMIT), and the entire completed transaction is sent atomically. The GTID→VGTID conversion happens in-place within the same event array, preserving the after-rows ordering.
3. Edge-gateway psdbconnect handler (handlers.go:121-254): handleMsg iterates over ALL events in a single VStream message in one pass — ROW events accumulate into inserts/updates/deletes, and the VGTID event sets the position. After the loop completes, a single SyncResponse is sent containing both the cursor and the rows together. Within a transaction, the cursor and its rows always arrive in the same SyncResponse.
Can cursor-only SyncResponse messages happen? Yes — for heartbeats, DDL events, and transactions that touch other tables (no matching rows for the streamed table). But in those cases, the VGTID position reflects a transaction that genuinely had zero rows for this table. Resuming from that position with the old code is completely safe — there are no rows to skip.
The specific data loss scenario described — "VGTID arrives, context cancels, rows for that VGTID never delivered, cursor checkpointed past them" — cannot occur with the current implementation. The cursor and its rows are always in the same gRPC message.
The codes.Canceled retry change will cause the connector to hang on shutdown
sync() creates a child context from the parent:
ctx, cancel := context.WithTimeout(ctx, readDuration)A codes.Canceled error from c.Recv() means the client's context was canceled. There are two cases:
- The child timeout fires → produces
codes.DeadlineExceeded, already handled. - The parent context is canceled (Fivetran terminates the sync) → produces
codes.Canceled.
In case 2, retrying is futile: Read() calls sync() again, which calls context.WithTimeout(canceledParentCtx, ...), which inherits the cancellation immediately. Each retry fails instantly, but the backoff time.Sleep still runs. With 5 retries and exponential backoff (10s → 20s → 40s → 80s → 160s), the connector will hang for up to ~5 minutes after Fivetran has already asked it to stop.
The edge-gateway itself already treats context.Canceled as a clean shutdown (handlers.go:257-259):
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return nil
}So the server closes the stream cleanly. The codes.Canceled the client sees is from its own canceled context, not a transient server error.
No tests
This PR modifies data-integrity logic and adds zero tests. The existing test suite has no coverage for error/timeout paths. At minimum:
- A test for
sync()returninglastSafeTCon error after a cursor-only response - A test that
codes.Canceledwith a live parent context retries correctly - A test that
codes.Canceledwith a dead parent context exits immediately (currently broken)
What IS good
The change from return nil, ... to return lastSafeTC, ... on delete/update serialization errors (old lines 320, 333) fixes a latent nil-pointer panic. The caller in Read() immediately dereferences the returned cursor:
currentPosition, err = p.sync(...)
if currentPosition.Position != "" { // panics if currentPosition is nilThis fix should be extracted into its own minimal PR.
Recommendation
The January 2026 incident ("3 records in billing_agreement_line had missing columns") needs its root cause re-investigated — the failure mode described in this PR cannot occur with the current VStream/edge-gateway implementation. If there was real data loss, this fix won't prevent a recurrence.
I'd suggest:
- Extract the nil-return fix into a separate small PR (the
return nil→return lastSafeTCchanges on delete/update errors). That's a real bug worth fixing. - Drop the
codes.Canceledretry change — it makes shutdown behavior actively worse. - If
lastSafeTCis still desired as purely defensive hardening, correct the comments to describe the actual scenarios, add tests, and submit separately.
| // transient connectivity errors where retrying from the last safe cursor | ||
| // is correct. Returning immediately on Canceled would checkpoint an | ||
| // advanced cursor whose rows may not have been delivered yet. | ||
| if s.Code() != codes.DeadlineExceeded && s.Code() != codes.Canceled { |
There was a problem hiding this comment.
This is the most concerning change in the PR. Adding codes.Canceled to the retry set will cause the connector to hang for up to ~5 minutes after Fivetran requests shutdown.
codes.Canceled from c.Recv() means the client's own context was canceled. The only way this happens (vs DeadlineExceeded) is if the parent context is canceled — i.e., the Fivetran harness is terminating the sync session.
When Read() retries, sync() creates a new child context:
ctx, cancel := context.WithTimeout(ctx, readDuration)But ctx is the already-canceled parent context, so the new child is born canceled. Each retry fails instantly, but the backoff sleep still runs (10s, 20s, 40s, 80s, 160s = ~5 min of sleeping).
The edge-gateway already handles context.Canceled as a clean shutdown, returning nil (clean EOF) to the client. So this error code represents a legitimate shutdown signal, not a transient failure.
If this change is kept, it must check ctx.Err() != nil before entering the retry loop to avoid hanging on shutdown. But I'd argue codes.Canceled simply shouldn't be retried at all — it's not the same class of error as DeadlineExceeded.
| watchForVgGtidChange := false | ||
| // lastSafeTC tracks the last cursor position where rows were confirmed written to | ||
| // the destination. VStream sends the VGTID event before the corresponding row events, | ||
| // so tc may advance to a new position before those rows are received. On a context |
There was a problem hiding this comment.
This comment is factually incorrect and should not be merged as-is.
VStream sends the VGTID event before the corresponding row events
The actual ordering, documented in vstreamer.go:244-248:
BEGIN -> ROWs -> GTID -> COMMIT
The VGTID comes after rows, not before. The edge-gateway's handleMsg processes all events in a single VStream message (ROWs and VGTID together) and sends one SyncResponse with both cursor and rows in the same message.
tc may advance to a new position before those rows are received
This cannot happen within a transaction. A cursor-only SyncResponse occurs when a transaction has no matching rows for this table (heartbeat, DDL, different-table transaction). In that case, there are no rows to lose — the old code was correct to advance tc.
If lastSafeTC is kept as defensive hardening, the comment should be rewritten to accurately describe the scenarios it guards against (e.g., "cursor-only responses from heartbeats or unrelated transactions"), not an event ordering that doesn't exist.
| return tc, err | ||
| if errors.Is(err, io.EOF) { | ||
| // Natural end of stream: all rows up to tc have been delivered. | ||
| return tc, err |
There was a problem hiding this comment.
The EOF special-case here is correct — on clean stream termination, all rows have been delivered, so returning tc is right.
But for the non-EOF error path on the next line (return lastSafeTC, err): this is overly conservative given the actual protocol behavior. Since the edge-gateway sends cursor + rows in the same SyncResponse, tc and lastSafeTC will differ only when the most recent response was a cursor-only heartbeat/DDL/unrelated-table event — meaning there are no rows at that position to lose. Returning lastSafeTC just forces unnecessary re-streaming of events that have no rows for this table.
Not a correctness issue (re-streaming is safe), but worth understanding that this isn't preventing data loss — it's adding redundant replay.
| @@ -320,7 +341,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam | |||
| } | |||
There was a problem hiding this comment.
This is a genuine bug fix — the old code returned nil here, and Read() immediately dereferences the result:
currentPosition, err = p.sync(...)
if currentPosition.Position != "" { // nil pointer panicSame for the update serialization error at line 354. This fix should be extracted and merged on its own — it doesn't depend on the lastSafeTC or codes.Canceled changes.
Summary
Fixes a silent data loss bug in the PlanetScale connector where records could be permanently skipped when a VStream
context cancelederror occurred during an incremental sync.Root Cause
SyncResponsewith a new cursor may arrive with no rows, with rows following in subsequent messages.tcimmediately upon receiving any cursor-bearing response, including bare VGTID events with no rows.context cancelederror fired after the VGTID event advancedtcbut before the corresponding row events arrived,sync()returned the advanced cursor without having written those rows.Read()treatedcontext canceledas a non-retryable error and returned immediately, passing the unsafe cursor up tosync.go.sync.gocheckpointed that advanced cursor, permanently skipping the in-flight records with no error surfaced to the user.billing_agreement_linetable had missing columns from the destination after a sync that loggedrpc error: code = Canceled desc = context cancelederrors.Changes
lib/connect_client.go—sync()Introduces
lastSafeTC, a cursor that only advances when row data has been confirmed written to the destination. A bare VGTID event (cursor with no rows) does not advancelastSafeTC. On anyc.Recv()error, the function now returnslastSafeTCinstead oftc, ensuring the caller always receives a cursor position from which a safe re-stream is possible.lib/connect_client.go—Read()Adds
codes.Canceledto the retryable error set alongside the existingcodes.DeadlineExceededhandling. Previously,context canceledwas treated as a permanent, non-retryable error that returned immediately. It is now routed through the existing exponential backoff andmaxConsecutiveTimeoutsretry loop (up to 5 retries, backoff starting at 10s capped at 5 minutes), consistent with how server-side timeouts are handled.Effect
On a
context canceledduring streaming:sync()returnslastSafeTC— the last cursor where rows were confirmed writtenRead()retries from that safe cursor with backoff rather than returning immediatelylastSafeTC, re-delivering any rows that were in-flight during the cancellation