Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;

import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.time.Conversions;
Expand All @@ -43,7 +44,34 @@ public class PostgresOffset extends Offset {

// used by PostgresOffsetFactory
PostgresOffset(Map<String, String> offset) {
this.offset = offset;
Map<String, String> filtered = new HashMap<>(offset);
// When a checkpoint is taken right after a COMMIT (state-3a), all three LSN fields
Comment on lines 46 to +48
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PostgresOffset.of(...) already builds a new HashMap (offsetStrMap) and then this constructor creates another HashMap copy (filtered). Since getStreamOffset() is called per streamed record (see IncrementalSourceStreamFetcher.shouldEmit), this adds an extra allocation per event. Consider avoiding the second copy in the common case (e.g., only copy when the triple-equality condition is met, otherwise keep the passed-in map), or perform the filtering before constructing the PostgresOffset when the input map is known to be newly created.

Copilot uses AI. Check for mistakes.
// converge to the same value: lsn == lsn_proc == lsn_commit.
// Recovering from such a checkpoint constructs WalPositionLocator(C0, C0), which
// causes the first new transaction's DML records (data_start=C0 in pgoutput) to be
// silently dropped: they are added to lsnSeen during the find phase, but
// startStreamingLsn is set to the next COMMIT (C1), so they are filtered in the
// stream phase.
//
// Fix: when lsn == lsn_proc == lsn_commit, remove lsn_proc and lsn_commit so that
// WalPositionLocator is constructed with lastCommitStoredLsn=null, which triggers
// the fast path: startStreamingLsn=firstLsnReceived=C0, all messages pass through.
//
// The triple-equality condition is safe: mid-transaction checkpoints (state-3b) have
// lsn_commit pointing to the previous commit, so lsn_commit != lsn, and this branch
// is not taken.
//
// This workaround can be removed once Debezium is upgraded to a version that
// includes DBZ-6204:
// https://github.com/debezium/debezium/commit/3b5740f1a836c8b438888f2458ebb1554320bac7
String lsnVal = filtered.get(SourceInfo.LSN_KEY);
String lsnProc = filtered.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
String lsnCommit = filtered.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
if (lsnVal != null && lsnVal.equals(lsnProc) && lsnVal.equals(lsnCommit)) {
filtered.remove(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY);
filtered.remove(PostgresOffsetContext.LAST_COMMIT_LSN_KEY);
}
Comment on lines +56 to +73
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says to remove lsn_proc and lsn_commit from PostgresOffset when constructed from a Debezium source offset, but the implementation only removes them when lsn == lsn_proc == lsn_commit. Please clarify whether the fix is intentionally limited to this state-3a case; if so, update the PR description accordingly, otherwise broaden the filtering logic so recovery offsets always omit those keys as described.

Copilot uses AI. Check for mistakes.
this.offset = filtered;
Comment on lines +47 to +74
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change addresses a data-loss scenario after checkpoint recovery, but there’s no accompanying regression test exercising recovery with lsn == lsn_proc == lsn_commit and pgoutput non-streaming where the first new transaction reuses the prior COMMIT data_start. Adding an ITCase/unit test that fails on the old behavior and passes with this filtering would help prevent regressions (e.g., extend existing checkpoint/restore tests in PostgreSQLSourceTest).

Copilot uses AI. Check for mistakes.
}

PostgresOffset(Long lsn, Long txId, Instant lastCommitTs) {
Expand Down
Loading