[FLINK-39308][runtime] Skip empty file-merging operator state snapshots#27814
[FLINK-39308][runtime] Skip empty file-merging operator state snapshots#27814infocusmodereal wants to merge 1 commit intoapache:masterfrom
Conversation
| Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = | ||
| syncPartResource.getRegisteredBroadcastStatesDeepCopies(); | ||
|
|
||
| if (registeredBroadcastStatesDeepCopies.isEmpty() |
There was a problem hiding this comment.
It is strange that this if does
registeredOperatorStatesDeepCopies.isEmpty()
and the first thing the method you have introduced does is:
registeredOperatorStatesDeepCopies.isEmpty()
I suggest doing everything in the method
| return false; | ||
| } | ||
|
|
||
| for (PartitionableListState<?> listState : registeredOperatorStatesDeepCopies.values()) { |
There was a problem hiding this comment.
I see the method is called private boolean hasOnlyEmptyOperatorListStates(
I am not sure what the test for emptiness is. The only thing I see is listState == null,
but if listState == null then we return false.
What am I missing?
|
|
||
| if (registeredBroadcastStatesDeepCopies.isEmpty() | ||
| && hasOnlyEmptyOperatorListStates(registeredOperatorStatesDeepCopies)) { | ||
| if (streamFactory instanceof FsMergingCheckpointStorageLocation) { |
There was a problem hiding this comment.
After casting to FsMergingCheckpointStorageLocation, it directly returns EmptyFileMergingOperatorStreamStateHandle. Add a comment explaining why empty snapshots take this branch.
| location.getExclusiveStateHandle(), | ||
| location.getSharedStateHandle())); | ||
| } else { | ||
| return snapshotCloseableRegistry -> SnapshotResult.empty(); |
There was a problem hiding this comment.
The else branch returns SnapshotResult.empty(). Add a comment explaining that this is the handling strategy for empty snapshots when not using FsMerging, for clarity.
| } | ||
|
|
||
| @Test | ||
| void testSnapshotWithEmptyRegisteredOperatorState() throws Exception { |
There was a problem hiding this comment.
In testSnapshotWithEmptyRegisteredOperatorState, currently asserts stateHandle is null. Also assert snapshotResult.isEmpty() or its type to ensure empty snapshot behavior is as expected.
| } | ||
|
|
||
| @Test | ||
| void testFileMergingSnapshotRestoreWithEmptyRegisteredUnionState(@TempDir File tmpFolder) |
There was a problem hiding this comment.
In testFileMergingSnapshotRestoreWithEmptyRegisteredUnionState, the finally block only calls discardState() on stateHandle. Consider additionally verifying that FsMerging-related resources (temp files, snapshotManager) are correctly released to prevent leaks.
| FutureUtils.runIfNotDoneAndGet(snapshot); | ||
| stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); | ||
|
|
||
| assertThat(stateHandle).isInstanceOf(EmptyFileMergingOperatorStreamStateHandle.class); |
There was a problem hiding this comment.
Asserting stateHandle type as EmptyFileMergingOperatorStreamStateHandle is correct. Add a comment explaining this is a special case for empty operator state to improve readability.
What is the purpose of the change
This pull request avoids materializing file-merging operator state handles when operator list state is registered but empty.
Today
DefaultOperatorStateBackendSnapshotStrategyonly takes the empty fast path when there are no registered operator states at all. If empty operator list states are registered, file-merging checkpoints can still create tiny segment-backed handles. During restore,OperatorStateRestoreOperationopens those handles even though they contain no operator-state partitions. On object stores this adds avoidable restore overhead.Brief change log
EmptyFileMergingOperatorStreamStateHandlefor file-merging checkpoints andSnapshotResult.empty()otherwiseVerifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change added tests and can be verified as follows:
testSnapshotWithEmptyRegisteredOperatorStatetestFileMergingSnapshotRestoreWithEmptyRegisteredUnionStateJAVA_HOME=$(/usr/libexec/java_home -v 17) ./mvnw -pl flink-runtime -Dtest=OperatorStateBackendTest,OperatorStateRestoreOperationTest,SharedStateRegistryTest test -Djdk17 -Pjava17-targetDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation