Skip to content

[FLINK-35321] Ensure pendingCommitables metric is not re-registered when copying CommitableCollector#27598

Open
DeamonDev wants to merge 6 commits intoapache:masterfrom
DeamonDev:FLINK-35321-do-not-register-pending-commitables-metric-while-collector-copy
Open

[FLINK-35321] Ensure pendingCommitables metric is not re-registered when copying CommitableCollector#27598
DeamonDev wants to merge 6 commits intoapache:masterfrom
DeamonDev:FLINK-35321-do-not-register-pending-commitables-metric-while-collector-copy

Conversation

@DeamonDev
Copy link

What is the purpose of the change

In this PR I add simple flag to constructor of CommittableCollector in order to stop re-registering pendingCommitables metric while deep-copy of this class.

Verifying this change

This change is already covered by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Signed-off-by: deamondev <piotr.rudnicki94@protonmail.com>
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 12, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

this.metricGroup = metricGroup;
this.metricGroup.setCurrentPendingCommittablesGauge(this::getNumPending);

if (setCurrentPendingCommitablesGauge) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want add a unit test in CommittableCollectorTest.java to verify that copy() does not re-register metrics as expected. A simple metric group stub that counts gauge registrations with before/after assertions should be enough.

Copy link
Author

@DeamonDev DeamonDev Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added unit test by simply using spy method provided by Mockito. I think its more elegant than writing new wrapper with custom reference counting. I reset Mockito's reference counters before each test, so it is thread-safe.

edit: I see mockito is discouraged. I'll rewrite it then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: for consistency, we may also want to update the static ofLegacy() path as well and adjust its collector to reflect the same changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now the constructor is

public CommittableCollector(
     SinkCommitterMetricGroup metricGroup, boolean shouldSetPendingCommittablesGauge) {
     this(new TreeMap<>(), metricGroup, shouldSetPendingCommittablesGauge);
}

and I adjusted it only in ofLegacy method where I think it should be set to false. Still I'm not quite sure if thats correct. So, I have another request for advice.

@DeamonDev DeamonDev changed the title [FLINK-35321] Add flag for metric registration [FLINK-35321] Ensure pendingCommitables is not re-registered when copying CommitableCollector Feb 12, 2026
@DeamonDev DeamonDev changed the title [FLINK-35321] Ensure pendingCommitables is not re-registered when copying CommitableCollector [FLINK-35321] Ensure pendingCommitables metric is not re-registered when copying CommitableCollector Feb 12, 2026
@DeamonDev
Copy link
Author

Thanks for your insight @rionmonster . I wonder on whether I should set this very new flag when spawning CommitableCollector in CommitableCollectionSerializer#deserializeV2. ATM I've set it to false but I am not quite sure. WDYT?

@rionmonster
Copy link
Contributor

@DeamonDev

I wonder on whether I should set this very new flag when spawning CommitableCollector in CommitableCollectionSerializer#deserializeV2. ATM I've set it to false but I am not quite sure. WDYT?

Yes, I think using false would be correct in this case. A cursory trace through deserializeV2() seems to validate that as these deserialized instances are transient and eventually merged into the live collector:

  • deserializeV2 creates a new CommitableCollector instance from the checkpoint
  • This new instance gets returned to the original iterator in CommitterOperator#initializeState and merged into the existing live instance of the collector (copying its data via a merge)

So we don't want to re-reregister the metrics during those transient instances that eventually are just discarded.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Feb 13, 2026
Signed-off-by: deamondev <piotr.rudnicki94@protonmail.com>
Signed-off-by: deamondev <piotr.rudnicki94@protonmail.com>
Signed-off-by: deamondev <piotr.rudnicki94@protonmail.com>
@DeamonDev
Copy link
Author

@flinkbot run azure

Signed-off-by: deamondev <piotr.rudnicki94@protonmail.com>
Signed-off-by: deamondev <piotr.rudnicki94@protonmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants