Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions lib/connect_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,12 @@ func (p connectClient) Read(ctx context.Context, logger DatabaseLogger, ps Plane
}
if err != nil {
if s, ok := status.FromError(err); ok {
// if the error is anything other than server timeout, keep going
if s.Code() != codes.DeadlineExceeded {
logger.Warning(fmt.Sprintf("%vGot error [%v] with message [%q], Returning with cursor :[%v] after non-timeout error", preamble, s.Code(), err, currentPosition))
// context.Canceled is treated the same as DeadlineExceeded: both are
// transient connectivity errors where retrying from the last safe cursor
// is correct. Returning immediately on Canceled would checkpoint an
// advanced cursor whose rows may not have been delivered yet.
if s.Code() != codes.DeadlineExceeded && s.Code() != codes.Canceled {
Copy link
Contributor

@nickvanw nickvanw Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most concerning change in the PR. Adding codes.Canceled to the retry set will cause the connector to hang for up to ~5 minutes after Fivetran requests shutdown.

codes.Canceled from c.Recv() means the client's own context was canceled. The only way this happens (vs DeadlineExceeded) is if the parent context is canceled — i.e., the Fivetran harness is terminating the sync session.

When Read() retries, sync() creates a new child context:

ctx, cancel := context.WithTimeout(ctx, readDuration)

But ctx is the already-canceled parent context, so the new child is born canceled. Each retry fails instantly, but the backoff sleep still runs (10s, 20s, 40s, 80s, 160s = ~5 min of sleeping).

The edge-gateway already handles context.Canceled as a clean shutdown, returning nil (clean EOF) to the client. So this error code represents a legitimate shutdown signal, not a transient failure.

If this change is kept, it must check ctx.Err() != nil before entering the retry loop to avoid hanging on shutdown. But I'd argue codes.Canceled simply shouldn't be retried at all — it's not the same class of error as DeadlineExceeded.

logger.Warning(fmt.Sprintf("%vGot error [%v] with message [%q], Returning with cursor :[%v] after non-retryable error", preamble, s.Code(), err, currentPosition))

// Check for binlog expiration error and reset cursor for historical sync
if IsBinlogsExpirationError(err) {
Expand Down Expand Up @@ -291,13 +294,31 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam

// stop when we've reached the well known stop position for this sync session.
watchForVgGtidChange := false
// lastSafeTC tracks the last cursor position where rows were confirmed written to
// the destination. VStream sends the VGTID event before the corresponding row events,
// so tc may advance to a new position before those rows are received. On a context
Copy link
Contributor

@nickvanw nickvanw Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is factually incorrect and should not be merged as-is.

VStream sends the VGTID event before the corresponding row events

The actual ordering, documented in vstreamer.go:244-248:

BEGIN -> ROWs -> GTID -> COMMIT

The VGTID comes after rows, not before. The edge-gateway's handleMsg processes all events in a single VStream message (ROWs and VGTID together) and sends one SyncResponse with both cursor and rows in the same message.

tc may advance to a new position before those rows are received

This cannot happen within a transaction. A cursor-only SyncResponse occurs when a transaction has no matching rows for this table (heartbeat, DDL, different-table transaction). In that case, there are no rows to lose — the old code was correct to advance tc.

If lastSafeTC is kept as defensive hardening, the comment should be rewritten to accurately describe the scenarios it guards against (e.g., "cursor-only responses from heartbeats or unrelated transactions"), not an event ordering that doesn't exist.

// cancellation, returning lastSafeTC ensures the next sync re-streams from a position
// where all rows are known to have been delivered, preventing silent data loss.
lastSafeTC := tc
for {

res, err := c.Recv()
if err != nil {
return tc, err
if errors.Is(err, io.EOF) {
// Natural end of stream: all rows up to tc have been delivered.
return tc, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EOF special-case here is correct — on clean stream termination, all rows have been delivered, so returning tc is right.

But for the non-EOF error path on the next line (return lastSafeTC, err): this is overly conservative given the actual protocol behavior. Since the edge-gateway sends cursor + rows in the same SyncResponse, tc and lastSafeTC will differ only when the most recent response was a cursor-only heartbeat/DDL/unrelated-table event — meaning there are no rows at that position to lose. Returning lastSafeTC just forces unnecessary re-streaming of events that have no rows for this table.

Not a correctness issue (re-streaming is safe), but worth understanding that this isn't preventing data loss — it's adding redundant replay.

}
// Mid-stream error (e.g. context canceled): return the last cursor
// where rows were confirmed written, not the potentially advanced tc.
return lastSafeTC, err
}

// Determine whether this response carries any row data before processing.
// A cursor-only response (bare VGTID event) has no rows and should not
// advance lastSafeTC — if the stream is canceled after such a response,
// the rows for that VGTID haven't arrived yet and must be re-streamed.
rowsInResponse := len(res.Result) > 0 || len(res.Deletes) > 0 || len(res.Updates) > 0

if onResult != nil {
for _, insertedRow := range res.Result {
qr := sqltypes.Proto3ToResult(insertedRow)
Expand All @@ -307,7 +328,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
}
sqlResult.Rows = append(sqlResult.Rows, row)
if err := onResult(sqlResult, OpType_Insert); err != nil {
return tc, status.Error(codes.Internal, "unable to serialize row")
return lastSafeTC, status.Error(codes.Internal, "unable to serialize row")
}
}
}
Expand All @@ -320,7 +341,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a genuine bug fix — the old code returned nil here, and Read() immediately dereferences the result:

currentPosition, err = p.sync(...)
if currentPosition.Position != "" {  // nil pointer panic

Same for the update serialization error at line 354. This fix should be extracted and merged on its own — it doesn't depend on the lastSafeTC or codes.Canceled changes.

sqlResult.Rows = append(sqlResult.Rows, row)
if err := onResult(sqlResult, OpType_Delete); err != nil {
return nil, status.Error(codes.Internal, "unable to serialize row")
return lastSafeTC, status.Error(codes.Internal, "unable to serialize row")
}
}
}
Expand All @@ -333,13 +354,19 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
After: serializeQueryResult(update.After),
}
if err := onUpdate(updatedRow); err != nil {
return nil, status.Error(codes.Internal, "unable to serialize update")
return lastSafeTC, status.Error(codes.Internal, "unable to serialize update")
}
}
}

if res.Cursor != nil {
tc = res.Cursor
// Only advance lastSafeTC when rows have been written at this cursor.
// A cursor-only VGTID event leaves lastSafeTC at its previous value so
// that a subsequent cancellation rolls back to a safe replay point.
if rowsInResponse {
lastSafeTC = tc
}
}

// Because of the ordering of events in a vstream
Expand Down