-
Notifications
You must be signed in to change notification settings - Fork 5
fix(planetscale): prevent silent data loss on vstream context canceled errors #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -152,9 +152,12 @@ func (p connectClient) Read(ctx context.Context, logger DatabaseLogger, ps Plane | |
| } | ||
| if err != nil { | ||
| if s, ok := status.FromError(err); ok { | ||
| // if the error is anything other than server timeout, keep going | ||
| if s.Code() != codes.DeadlineExceeded { | ||
| logger.Warning(fmt.Sprintf("%vGot error [%v] with message [%q], Returning with cursor :[%v] after non-timeout error", preamble, s.Code(), err, currentPosition)) | ||
| // context.Canceled is treated the same as DeadlineExceeded: both are | ||
| // transient connectivity errors where retrying from the last safe cursor | ||
| // is correct. Returning immediately on Canceled would checkpoint an | ||
| // advanced cursor whose rows may not have been delivered yet. | ||
| if s.Code() != codes.DeadlineExceeded && s.Code() != codes.Canceled { | ||
| logger.Warning(fmt.Sprintf("%vGot error [%v] with message [%q], Returning with cursor :[%v] after non-retryable error", preamble, s.Code(), err, currentPosition)) | ||
|
|
||
| // Check for binlog expiration error and reset cursor for historical sync | ||
| if IsBinlogsExpirationError(err) { | ||
|
|
@@ -291,13 +294,31 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam | |
|
|
||
| // stop when we've reached the well known stop position for this sync session. | ||
| watchForVgGtidChange := false | ||
| // lastSafeTC tracks the last cursor position where rows were confirmed written to | ||
| // the destination. VStream sends the VGTID event before the corresponding row events, | ||
| // so tc may advance to a new position before those rows are received. On a context | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is factually incorrect and should not be merged as-is.
The actual ordering, documented in The VGTID comes after rows, not before. The edge-gateway's
This cannot happen within a transaction. A cursor-only If |
||
| // cancellation, returning lastSafeTC ensures the next sync re-streams from a position | ||
| // where all rows are known to have been delivered, preventing silent data loss. | ||
| lastSafeTC := tc | ||
| for { | ||
|
|
||
| res, err := c.Recv() | ||
| if err != nil { | ||
| return tc, err | ||
| if errors.Is(err, io.EOF) { | ||
| // Natural end of stream: all rows up to tc have been delivered. | ||
| return tc, err | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The EOF special-case here is correct — on clean stream termination, all rows have been delivered, so returning But for the non-EOF error path on the next line ( Not a correctness issue (re-streaming is safe), but worth understanding that this isn't preventing data loss — it's adding redundant replay. |
||
| } | ||
| // Mid-stream error (e.g. context canceled): return the last cursor | ||
| // where rows were confirmed written, not the potentially advanced tc. | ||
| return lastSafeTC, err | ||
| } | ||
|
|
||
| // Determine whether this response carries any row data before processing. | ||
| // A cursor-only response (bare VGTID event) has no rows and should not | ||
| // advance lastSafeTC — if the stream is canceled after such a response, | ||
| // the rows for that VGTID haven't arrived yet and must be re-streamed. | ||
| rowsInResponse := len(res.Result) > 0 || len(res.Deletes) > 0 || len(res.Updates) > 0 | ||
|
|
||
| if onResult != nil { | ||
| for _, insertedRow := range res.Result { | ||
| qr := sqltypes.Proto3ToResult(insertedRow) | ||
|
|
@@ -307,7 +328,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam | |
| } | ||
| sqlResult.Rows = append(sqlResult.Rows, row) | ||
| if err := onResult(sqlResult, OpType_Insert); err != nil { | ||
| return tc, status.Error(codes.Internal, "unable to serialize row") | ||
| return lastSafeTC, status.Error(codes.Internal, "unable to serialize row") | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -320,7 +341,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam | |
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a genuine bug fix — the old code returned currentPosition, err = p.sync(...)
if currentPosition.Position != "" { // nil pointer panicSame for the update serialization error at line 354. This fix should be extracted and merged on its own — it doesn't depend on the |
||
| sqlResult.Rows = append(sqlResult.Rows, row) | ||
| if err := onResult(sqlResult, OpType_Delete); err != nil { | ||
| return nil, status.Error(codes.Internal, "unable to serialize row") | ||
| return lastSafeTC, status.Error(codes.Internal, "unable to serialize row") | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -333,13 +354,19 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam | |
| After: serializeQueryResult(update.After), | ||
| } | ||
| if err := onUpdate(updatedRow); err != nil { | ||
| return nil, status.Error(codes.Internal, "unable to serialize update") | ||
| return lastSafeTC, status.Error(codes.Internal, "unable to serialize update") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if res.Cursor != nil { | ||
| tc = res.Cursor | ||
| // Only advance lastSafeTC when rows have been written at this cursor. | ||
| // A cursor-only VGTID event leaves lastSafeTC at its previous value so | ||
| // that a subsequent cancellation rolls back to a safe replay point. | ||
| if rowsInResponse { | ||
| lastSafeTC = tc | ||
| } | ||
| } | ||
|
|
||
| // Because of the ordering of events in a vstream | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the most concerning change in the PR. Adding
codes.Canceledto the retry set will cause the connector to hang for up to ~5 minutes after Fivetran requests shutdown.codes.Canceledfromc.Recv()means the client's own context was canceled. The only way this happens (vsDeadlineExceeded) is if the parent context is canceled — i.e., the Fivetran harness is terminating the sync session.When
Read()retries,sync()creates a new child context:But
ctxis the already-canceled parent context, so the new child is born canceled. Each retry fails instantly, but the backoff sleep still runs (10s, 20s, 40s, 80s, 160s = ~5 min of sleeping).The edge-gateway already handles
context.Canceledas a clean shutdown, returning nil (clean EOF) to the client. So this error code represents a legitimate shutdown signal, not a transient failure.If this change is kept, it must check
ctx.Err() != nilbefore entering the retry loop to avoid hanging on shutdown. But I'd arguecodes.Canceledsimply shouldn't be retried at all — it's not the same class of error asDeadlineExceeded.