Skip to content

[flink] Preserve log-only source restore mode#3355

Open
luoyuxia wants to merge 2 commits into
apache:mainfrom
luoyuxia:fix-flink-log-only-restore-lake-splits
Open

[flink] Preserve log-only source restore mode#3355
luoyuxia wants to merge 2 commits into
apache:mainfrom
luoyuxia:fix-flink-log-only-restore-lake-splits

Conversation

@luoyuxia
Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia commented May 20, 2026

Purpose

Linked issue: close #3354

Preserve the original Fluss-only (non-lake) startup semantics after a Flink source restore. When the source starts from a specified Fluss log position, it should not initialize lake snapshot splits after restore just because lakeSource is available again.

Brief change log

  • Persist an empty remainingHybridLakeFlussSplits list when the enumerator checkpoints with lakeSource == null.
  • Allow empty pending hybrid split lists to deserialize without lakeSource, while still requiring lakeSource for non-empty hybrid split state.
  • Document the three-state meaning of pendingHybridLakeFlussSplits so null, empty, and non-empty states are explicit.
  • Add a restore regression test that checkpoints a Fluss-only enumerator, restores it with a non-null lakeSource and a registered lake snapshot, and verifies no LakeSnapshotSplit is generated.
  • Add a serializer round-trip test for empty pending hybrid split state with lakeSource == null.

Tests

  • ./mvnw -pl fluss-flink/fluss-flink-common -Dtest=SourceEnumeratorStateSerializerTest,FlinkSourceEnumeratorTest#testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits test
  • ./mvnw -pl fluss-flink/fluss-flink-common spotless:apply
  • git diff --check

API and Format

No public API, storage format, or checkpoint serializer version change. The fix only changes the value stored in the existing enumerator state field for Fluss-only source checkpoints and permits that empty-state sentinel to deserialize without a lake source.

Documentation

No new feature or user-facing configuration change.

Generative AI disclosure

  • Yes: OpenAI Codex was used to author this PR following the repository AGENTS.md guidance.

@luoyuxia luoyuxia force-pushed the fix-flink-log-only-restore-lake-splits branch 2 times, most recently from cdabb47 to 71b01aa Compare May 20, 2026 03:55
@luoyuxia luoyuxia force-pushed the fix-flink-log-only-restore-lake-splits branch from 71b01aa to a01bd57 Compare May 20, 2026 05:54
@luoyuxia luoyuxia requested a review from loserwang1024 May 20, 2026 07:28
@luoyuxia
Copy link
Copy Markdown
Contributor Author

@loserwang1024 Could you please help review

// Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored
// enumerator with a non-null lakeSource would treat null as "not initialized yet"
// and generate lake snapshot splits.
lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you mean, but the code seems hard to understand without sufficient context. I've also thought about this:

  • If the enumerator is created by FlinkSource#createEnumerator, it indicates a stateless restart. Therefore, whether to generate lake splits depends on whether it's a LakeSource.

  • If the enumerator is created by FlinkSource#restoreEnumerator, there's no need to generate lake splits again. This is because before the first checkpoint is taken, FlinkSourceEnumerator#start → FlinkSourceEnumerator#generateHybridLakeFlussSplits has already been executed. Thus, upon restoration, the lake splits do not need to be regenerated.

Therefore, even if the job was previously started from a specified timestamp, according to this logic, as long as a checkpoint has been taken, upon stateful restart it will not read the lake splits again.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the original approach in snapshotState() is hard to understand without sufficient context.

I've reworked the fix to use the checkpointTriggeredBefore flag in generateHybridLakeFlussSplits() instead. While the restore-awareness logic still requires some thought, this approach keeps all the complexity contained within a single method rather than spreading it across snapshotState().

Additionally, I changed startInStreamModeForNonPartitionedTable to call generateHybridLakeFlussSplits() synchronously, consistent with the partitioned-table path in start(). This ensures lake split initialization always completes before any checkpoint can be triggered, which is a prerequisite for the checkpointTriggeredBefore guard to work correctly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@luoyuxia luoyuxia requested a review from loserwang1024 May 20, 2026 09:41
@luoyuxia
Copy link
Copy Markdown
Contributor Author

@loserwang1024 Comments has been addressed.

Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia Ty for the PR, I agree with the sync change and the serializer reorder, but I have comment about checkpointTriggeredBefore guard.

// Restored from checkpoint but pending lake split is null(e.g. the source was
// originally started in Fluss-only mode without lake). Do not generate lake
// splits for this restore; mark as initialized and return empty list.
if (checkpointTriggeredBefore) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the checkpointTriggeredBefore guard is solving the restore ambiguity in the wrong layer.

Fresh null means “not initialized yet”.
Restored null should be normalized to “nothing pending”.

That normalization belongs in FlinkSource.restoreEnumerator(), not in generateHybridLakeFlussSplits().
WDYT?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flink] Avoid initializing lake snapshot splits after log-only source restore

3 participants