Skip to content

Conversation

@nateab
Copy link
Contributor

@nateab nateab commented Feb 11, 2026

What is the purpose of the change

When a Flink job recovers from a checkpoint/savepoint, the CommitterOperator replays uncommitted transactions. If a Kafka transaction has expired or the producer ID mapping is lost (e.g., InvalidPidMappingException), the commit fails via signalFailedWithUnknownReason() which throws unconditionally, causing an infinite restart loop with no way to recover -- even from earlier savepoints.

The CommitRequestImpl code already had TODO comments noting: "let the user configure a strategy for failing and apply it here". This PR implements that configurability by adding a CommitFailureStrategy enum (FAIL/WARN) and a config option sink.committer.failure-strategy that controls whether unknown commit failures throw (default, preserving current behavior) or log a warning and skip the committable, allowing recovery to proceed.

Brief change log

  • Added CommitFailureStrategy enum (FAIL/WARN) in flink-core with @PublicEvolving annotation
  • Added sink.committer.failure-strategy config option to SinkOptions (default: FAIL)
  • Modified CommitRequestImpl.signalFailedWithUnknownReason() to respect the configured strategy
  • Added logging to CommitRequestImpl.signalFailedWithKnownReason() to match interface contract
  • Added overloaded commit() method to CheckpointCommittableManager interface with strategy parameter (backward-compatible default method)
  • Updated CheckpointCommittableManagerImpl to set failure strategy on requests before committing
  • Updated CommitterOperator and GlobalCommitterOperator to read and pass the strategy config

Verifying this change

This change added tests and can be verified as follows:

  • Added CommitRequestImplTest with 4 unit tests: default strategy throws, explicit FAIL throws, WARN logs and skips, signalFailedWithKnownReason always discards regardless of strategy
  • Added 2 tests to CheckpointCommittableManagerImplTest: WARN strategy completes without throwing on unknown failures, FAIL strategy throws on unknown failures
  • Added 3 tests to SinkV2CommitterOperatorTest: operator-level WARN strategy skips failures, FAIL strategy throws, and WARN strategy succeeds on state restore/recovery with a failing committer

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (CommitFailureStrategy enum and SinkOptions.COMMITTER_FAILURE_STRATEGY are @PublicEvolving)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (only affects commit path, not per-record processing)
  • Anything that affects deployment or recovery: yes (enables recovery from previously-fatal commit failures when configured with WARN strategy)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs (on CommitFailureStrategy enum, SinkOptions.COMMITTER_FAILURE_STRATEGY config option description)

…ink V2

When recovering from a checkpoint/savepoint, the CommitterOperator replays
uncommitted transactions. If a transaction has expired or the producer ID
mapping is lost, the commit fails with signalFailedWithUnknownReason() which
throws unconditionally, causing an infinite restart loop with no recovery path.

This adds a CommitFailureStrategy enum (FAIL/WARN) and a new config option
sink.committer.failure-strategy that controls whether unknown commit failures
throw (default, preserving current behavior) or log a warning and skip the
committable, allowing recovery to proceed.
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 11, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants