Skip to content

[FLINK-39308][runtime] Skip empty file-merging operator state snapshots#27814

Open
infocusmodereal wants to merge 1 commit intoapache:masterfrom
infocusmodereal:fix/FLINK-39308/skip-empty-operator-state
Open

[FLINK-39308][runtime] Skip empty file-merging operator state snapshots#27814
infocusmodereal wants to merge 1 commit intoapache:masterfrom
infocusmodereal:fix/FLINK-39308/skip-empty-operator-state

Conversation

@infocusmodereal
Copy link
Copy Markdown

What is the purpose of the change

This pull request avoids materializing file-merging operator state handles when operator list state is registered but empty.

Today DefaultOperatorStateBackendSnapshotStrategy only takes the empty fast path when there are no registered operator states at all. If empty operator list states are registered, file-merging checkpoints can still create tiny segment-backed handles. During restore, OperatorStateRestoreOperation opens those handles even though they contain no operator-state partitions. On object stores this adds avoidable restore overhead.

Brief change log

  • Detect the case where there are no broadcast states and all registered operator list states are empty
  • Reuse the existing empty snapshot fast path for that case
  • Return EmptyFileMergingOperatorStreamStateHandle for file-merging checkpoints and SnapshotResult.empty() otherwise
  • Add tests for empty registered operator state snapshots and file-merging restore

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

  • Added testSnapshotWithEmptyRegisteredOperatorState
  • Added testFileMergingSnapshotRestoreWithEmptyRegisteredUnionState
  • Ran JAVA_HOME=$(/usr/libexec/java_home -v 17) ./mvnw -pl flink-runtime -Dtest=OperatorStateBackendTest,OperatorStateRestoreOperationTest,SharedStateRegistryTest test -Djdk17 -Pjava17-target

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 23, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
syncPartResource.getRegisteredBroadcastStatesDeepCopies();

if (registeredBroadcastStatesDeepCopies.isEmpty()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is strange that this if does
registeredOperatorStatesDeepCopies.isEmpty()
and the first thing the method you have introduced does is:
registeredOperatorStatesDeepCopies.isEmpty()

I suggest doing everything in the method

return false;
}

for (PartitionableListState<?> listState : registeredOperatorStatesDeepCopies.values()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the method is called private boolean hasOnlyEmptyOperatorListStates(
I am not sure what the test for emptiness is. The only thing I see is listState == null,
but if listState == null then we return false.

What am I missing?


if (registeredBroadcastStatesDeepCopies.isEmpty()
&& hasOnlyEmptyOperatorListStates(registeredOperatorStatesDeepCopies)) {
if (streamFactory instanceof FsMergingCheckpointStorageLocation) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After casting to FsMergingCheckpointStorageLocation, it directly returns EmptyFileMergingOperatorStreamStateHandle. Add a comment explaining why empty snapshots take this branch.

location.getExclusiveStateHandle(),
location.getSharedStateHandle()));
} else {
return snapshotCloseableRegistry -> SnapshotResult.empty();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else branch returns SnapshotResult.empty(). Add a comment explaining that this is the handling strategy for empty snapshots when not using FsMerging, for clarity.

}

@Test
void testSnapshotWithEmptyRegisteredOperatorState() throws Exception {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In testSnapshotWithEmptyRegisteredOperatorState, currently asserts stateHandle is null. Also assert snapshotResult.isEmpty() or its type to ensure empty snapshot behavior is as expected.

}

@Test
void testFileMergingSnapshotRestoreWithEmptyRegisteredUnionState(@TempDir File tmpFolder)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In testFileMergingSnapshotRestoreWithEmptyRegisteredUnionState, the finally block only calls discardState() on stateHandle. Consider additionally verifying that FsMerging-related resources (temp files, snapshotManager) are correctly released to prevent leaks.

FutureUtils.runIfNotDoneAndGet(snapshot);
stateHandle = snapshotResult.getJobManagerOwnedSnapshot();

assertThat(stateHandle).isInstanceOf(EmptyFileMergingOperatorStreamStateHandle.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserting stateHandle type as EmptyFileMergingOperatorStreamStateHandle is correct. Add a comment explaining this is a special case for empty operator state to improve readability.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Mar 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants