[flink] Preserve log-only source restore mode#3355
Conversation
cdabb47 to
71b01aa
Compare
71b01aa to
a01bd57
Compare
|
@loserwang1024 Could you please help review |
| // Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored | ||
| // enumerator with a non-null lakeSource would treat null as "not initialized yet" | ||
| // and generate lake snapshot splits. | ||
| lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits; |
There was a problem hiding this comment.
I understand what you mean, but the code seems hard to understand without sufficient context. I've also thought about this:
-
If the enumerator is created by FlinkSource#createEnumerator, it indicates a stateless restart. Therefore, whether to generate lake splits depends on whether it's a LakeSource.
-
If the enumerator is created by FlinkSource#restoreEnumerator, there's no need to generate lake splits again. This is because before the first checkpoint is taken, FlinkSourceEnumerator#start → FlinkSourceEnumerator#generateHybridLakeFlussSplits has already been executed. Thus, upon restoration, the lake splits do not need to be regenerated.
Therefore, even if the job was previously started from a specified timestamp, according to this logic, as long as a checkpoint has been taken, upon stateful restart it will not read the lake splits again.
There was a problem hiding this comment.
I agree the original approach in snapshotState() is hard to understand without sufficient context.
I've reworked the fix to use the checkpointTriggeredBefore flag in generateHybridLakeFlussSplits() instead. While the restore-awareness logic still requires some thought, this approach keeps all the complexity contained within a single method rather than spreading it across snapshotState().
Additionally, I changed startInStreamModeForNonPartitionedTable to call generateHybridLakeFlussSplits() synchronously, consistent with the partitioned-table path in start(). This ensures lake split initialization always completes before any checkpoint can be triggered, which is a prerequisite for the checkpointTriggeredBefore guard to work correctly.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@loserwang1024 Comments has been addressed. |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@luoyuxia Ty for the PR, I agree with the sync change and the serializer reorder, but I have comment about checkpointTriggeredBefore guard.
| // Restored from checkpoint but pending lake split is null(e.g. the source was | ||
| // originally started in Fluss-only mode without lake). Do not generate lake | ||
| // splits for this restore; mark as initialized and return empty list. | ||
| if (checkpointTriggeredBefore) { |
There was a problem hiding this comment.
I think the checkpointTriggeredBefore guard is solving the restore ambiguity in the wrong layer.
Fresh null means “not initialized yet”.
Restored null should be normalized to “nothing pending”.
That normalization belongs in FlinkSource.restoreEnumerator(), not in generateHybridLakeFlussSplits().
WDYT?
Purpose
Linked issue: close #3354
Preserve the original Fluss-only (non-lake) startup semantics after a Flink source restore. When the source starts from a specified Fluss log position, it should not initialize lake snapshot splits after restore just because
lakeSourceis available again.Brief change log
remainingHybridLakeFlussSplitslist when the enumerator checkpoints withlakeSource == null.lakeSource, while still requiringlakeSourcefor non-empty hybrid split state.pendingHybridLakeFlussSplitssonull, empty, and non-empty states are explicit.lakeSourceand a registered lake snapshot, and verifies noLakeSnapshotSplitis generated.lakeSource == null.Tests
./mvnw -pl fluss-flink/fluss-flink-common -Dtest=SourceEnumeratorStateSerializerTest,FlinkSourceEnumeratorTest#testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits test./mvnw -pl fluss-flink/fluss-flink-common spotless:applygit diff --checkAPI and Format
No public API, storage format, or checkpoint serializer version change. The fix only changes the value stored in the existing enumerator state field for Fluss-only source checkpoints and permits that empty-state sentinel to deserialize without a lake source.
Documentation
No new feature or user-facing configuration change.
Generative AI disclosure