Skip to content

[fix](streamingjob) fix postgres DML silently dropped on task restart#61481

Open
JNSimba wants to merge 2 commits intoapache:masterfrom
JNSimba:fix_pg_loss_bug
Open

[fix](streamingjob) fix postgres DML silently dropped on task restart#61481
JNSimba wants to merge 2 commits intoapache:masterfrom
JNSimba:fix_pg_loss_bug

Conversation

@JNSimba
Copy link
Member

@JNSimba JNSimba commented Mar 18, 2026

What problem does this PR solve?

Problem

When a streaming job restarts a task, the first DML of the new transaction
is occasionally silently dropped (10-20% failure rate). The affected record
never appears in the Doris target table, with no error logged — only
"identified as already processed" in cdc-client.log.

Root Cause

debezium 1.9.x hardcodes proto_version=1 (non-streaming pgoutput) for all
PG versions. In non-streaming mode, the walsender batches all changes of a
transaction and sends them after COMMIT, and all messages (BEGIN + DML) share
the same XLogData.data_start = the transaction's begin_lsn.

When this begin_lsn equals the previous transaction's commit_lsn (i.e.
the two transactions are adjacent in WAL with no other writes between them),
WalPositionLocator behaves incorrectly:

  1. Find phase: COMMIT(T1) at lsn=Y sets storeLsnAfterLastEventStoredLsn=true.
    BEGIN(T2) and INSERT(T2) both have lsn=Y, so they keep returning
    Optional.empty(). Only COMMIT(T2) at lsn=Z sets
    startStreamingLsn=Z, with lsnSeen={Y, Z}.

  2. Actual streaming: INSERT(T2) arrives with lastReceiveLsn=Y.
    skipMessage(Y): Y ∈ lsnSeen and Y ≠ startStreamingLsn(Z) → filtered.

The bug is intermittent because it only triggers when no other WAL activity
(autovacuum, other connections) occurs between the two transactions.

Fix

Override extractBinlogStateOffset() in PostgresSourceReader to strip
lsn_proc and lsn_commit from the offset before it is passed to debezium.
This constructs WalPositionLocator(lastCommitStoredLsn=null, lsn=Y), which
causes the find phase to exit immediately at the first received message
(startStreamingLsn=Y). In actual streaming, COMMIT(T1) triggers
switch-off (lastReceiveLsn=Y = startStreamingLsn), and all subsequent
messages including INSERT(T2) pass through.

See https://issues.apache.org/jira/browse/FLINK-39265.

Test

Run test_streaming_postgres_job multiple times. Before this fix the
'Apache' assertion fails ~10-20% of the time; after this fix it passes
consistently.

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@JNSimba
Copy link
Member Author

JNSimba commented Mar 18, 2026

run buildall

@Thearas
Copy link
Contributor

Thearas commented Mar 18, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@JNSimba
Copy link
Member Author

JNSimba commented Mar 18, 2026

run buildall

@doris-robot
Copy link

TPC-H: Total hot run time: 27048 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 314e84b8a192ae2ead690f17b9d4a6d6a9b5839f, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17600	4428	4284	4284
q2	q3	10640	771	544	544
q4	4680	355	251	251
q5	7561	1224	1012	1012
q6	187	178	153	153
q7	794	840	667	667
q8	9304	1514	1449	1449
q9	4891	4745	4762	4745
q10	6305	1910	1659	1659
q11	467	288	239	239
q12	692	581	468	468
q13	18039	2926	2183	2183
q14	223	231	223	223
q15	q16	758	731	667	667
q17	723	902	436	436
q18	5974	5403	5243	5243
q19	1293	999	621	621
q20	562	500	376	376
q21	4820	1873	1530	1530
q22	482	364	298	298
Total cold run time: 95995 ms
Total hot run time: 27048 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4771	4603	4664	4603
q2	q3	3889	4354	3822	3822
q4	944	1264	818	818
q5	4094	4389	4390	4389
q6	185	174	144	144
q7	1780	1678	1543	1543
q8	2532	2723	2968	2723
q9	7669	7472	7563	7472
q10	3761	3983	3546	3546
q11	524	443	417	417
q12	554	666	518	518
q13	2772	3164	2433	2433
q14	286	302	280	280
q15	q16	711	760	723	723
q17	1192	1277	1327	1277
q18	7164	6988	6550	6550
q19	975	982	1040	982
q20	2111	2109	2004	2004
q21	3997	3473	3326	3326
q22	472	426	376	376
Total cold run time: 50383 ms
Total hot run time: 47946 ms

@doris-robot
Copy link

TPC-DS: Total hot run time: 168708 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpcds-tools
TPC-DS sf100 test result on commit 314e84b8a192ae2ead690f17b9d4a6d6a9b5839f, data reload: false

query5	4342	654	501	501
query6	336	232	217	217
query7	4214	469	278	278
query8	347	253	241	241
query9	8786	2759	2732	2732
query10	514	427	342	342
query11	6992	5112	4936	4936
query12	182	133	129	129
query13	1274	494	366	366
query14	5728	3751	3504	3504
query14_1	2932	2885	2845	2845
query15	212	198	178	178
query16	979	460	451	451
query17	919	743	640	640
query18	2458	467	364	364
query19	218	210	193	193
query20	135	128	132	128
query21	215	143	113	113
query22	13325	14171	14748	14171
query23	16234	15827	15486	15486
query23_1	15738	15725	15360	15360
query24	7199	1647	1241	1241
query24_1	1264	1247	1243	1243
query25	590	479	397	397
query26	1239	272	148	148
query27	2784	480	303	303
query28	4492	1849	1834	1834
query29	823	566	474	474
query30	294	233	188	188
query31	1019	970	876	876
query32	93	74	76	74
query33	525	344	292	292
query34	928	875	515	515
query35	636	684	604	604
query36	1081	1169	1031	1031
query37	137	99	86	86
query38	2933	2971	2851	2851
query39	856	829	805	805
query39_1	793	784	789	784
query40	247	150	133	133
query41	64	60	62	60
query42	263	259	258	258
query43	239	247	221	221
query44	
query45	199	190	185	185
query46	874	980	614	614
query47	2111	2142	2017	2017
query48	305	315	232	232
query49	635	462	382	382
query50	698	278	214	214
query51	4084	4089	4044	4044
query52	262	266	257	257
query53	303	340	292	292
query54	311	276	275	275
query55	94	86	86	86
query56	310	329	324	324
query57	1946	1811	1692	1692
query58	289	278	266	266
query59	2800	2962	2748	2748
query60	347	346	342	342
query61	159	155	180	155
query62	642	592	540	540
query63	316	294	283	283
query64	5079	1296	1032	1032
query65	
query66	1466	457	359	359
query67	24299	24379	24199	24199
query68	
query69	409	317	287	287
query70	976	961	920	920
query71	344	308	312	308
query72	2785	2674	2454	2454
query73	548	546	317	317
query74	9591	9555	9443	9443
query75	2878	2809	2476	2476
query76	2266	1047	687	687
query77	377	380	317	317
query78	10883	11099	10443	10443
query79	1134	772	563	563
query80	951	629	557	557
query81	516	269	234	234
query82	1330	160	123	123
query83	342	264	256	256
query84	294	117	100	100
query85	888	490	446	446
query86	403	297	299	297
query87	3129	3140	3121	3121
query88	3540	2663	2651	2651
query89	431	366	351	351
query90	1922	187	172	172
query91	172	185	144	144
query92	76	77	71	71
query93	930	823	496	496
query94	523	324	282	282
query95	586	349	327	327
query96	644	508	236	236
query97	2444	2512	2403	2403
query98	236	225	223	223
query99	1000	1012	923	923
Total cold run time: 248496 ms
Total hot run time: 168708 ms

@hello-stephen
Copy link
Contributor

FE UT Coverage Report

Increment line coverage 0.00% (0/1) 🎉
Increment coverage report
Complete coverage report

@JNSimba
Copy link
Member Author

JNSimba commented Mar 19, 2026

run external

@JNSimba JNSimba requested a review from Copilot March 19, 2026 03:07
@JNSimba
Copy link
Member Author

JNSimba commented Mar 19, 2026

/review

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes an intermittent Postgres CDC failure where the first DML after a task restart could be silently dropped due to Debezium’s WalPositionLocator behavior in pgoutput non-streaming mode.

Changes:

  • Override extractBinlogStateOffset() in PostgresSourceReader to remove lsn_proc/lsn_commit keys before Debezium consumes the offset.
  • Clarify a log message in StreamingMultiTblTask to accurately reflect “timeout reason”.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java Adjusts Postgres offset passed to Debezium to avoid WalPositionLocator incorrectly filtering DML on restart.
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java Fixes misleading error log text for timeout-reason retrieval failures.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

@github-actions
Copy link
Contributor

Code Review Summary

PR: fix fix postgres DML silently dropped on task restart

Overview

This PR fixes a bug where the first DML of a new Postgres streaming transaction is intermittently dropped (10-20% failure rate) on task restart, with no error logged. The fix strips lsn_proc and lsn_commit keys from the persisted binlog state offset so that Debezium's WalPositionLocator skips its "find phase" entirely, preventing incorrect filtering of DML events. This is a workaround for FLINK-39265.

Critical Checkpoint Conclusions

  1. Goal / Correctness: The fix correctly addresses the root cause. In pgoutput non-streaming mode (proto_version=1, Debezium 1.9.x), BEGIN and DML messages share the same XLogData.data_start. When begin_lsn equals the previous commit_lsn, the find phase incorrectly marks DMLs as already-processed. Stripping lsn_proc/lsn_commit sets lastCommitStoredLsn=null in WalPositionLocator, causing immediate find-phase exit. Verified that all downstream code paths (Flink CDC PostgresOffset, Debezium Loader.load(), replication slot flushing) gracefully handle the absence of these keys.

  2. Modification size / clarity: Very small and focused — a single method override (7 lines of code) plus an excellent Javadoc comment explaining the rationale. The log message fix in StreamingMultiTblTask.java is a trivial correctness improvement. No issues.

  3. Concurrency: No concerns. extractBinlogStateOffset operates on a locally created HashMap returned by super.extractBinlogStateOffset(). No shared state is modified.

  4. Lifecycle management: No new lifecycle concerns introduced.

  5. Configuration items: None added. N/A.

  6. Incompatible changes / rolling upgrade: The persisted offset format changes (two keys stripped). Verified safe: (a) Debezium's PostgresOffsetContext.Loader.load() falls back to the main lsn value when lsn_proc/lsn_commit are absent; (b) existing offsets containing these keys continue to work since stripping only occurs on extraction; (c) the live streaming offsetContext continues to populate these keys normally during operation, so replication slot management is unaffected.

  7. Parallel code paths: MySqlSourceReader has its own extractBinlogStateOffset override that does not need this fix (MySQL binlog offsets are not affected by this Postgres-specific WAL position issue). No action needed.

  8. Test coverage: No tests exist for PostgresSourceReader or extractBinlogStateOffset. The entire fs_brokers/cdc_client module has minimal test coverage (only SchemaChangeHelperTest and SmallFileMgrTest). While testing this specific Debezium integration bug requires a running Postgres instance with specific WAL patterns (making unit testing difficult), a unit test verifying that extractBinlogStateOffset strips the expected keys from a mock offset map would be straightforward and valuable.

  9. Observability: The thorough Javadoc comment with JIRA reference provides good documentation. No additional logging needed for this fix path.

  10. Performance: No concerns — two HashMap.remove() calls on a small map.

  11. Other issues: None found. The log message fix ("Send get task fail reason" -> "Send get task timeout reason") correctly matches the method name getTimeoutReason().

Verdict

No blocking issues. The fix is correct, well-documented, and safe for production. The only suggestion is to consider adding a unit test for the extractBinlogStateOffset override to prevent future regressions, but this is not a blocker.

@JNSimba
Copy link
Member Author

JNSimba commented Mar 19, 2026

run external

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants