Skip to content

NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions#10769

Open
pvillard31 wants to merge 5 commits intoapache:mainfrom
pvillard31:NIFI-15464
Open

NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions#10769
pvillard31 wants to merge 5 commits intoapache:mainfrom
pvillard31:NIFI-15464

Conversation

@pvillard31
Copy link
Contributor

@pvillard31 pvillard31 commented Jan 14, 2026

Summary

NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions

Problem

The ConsumeKafka processor using Kafka3ConnectionService causes duplicate message processing when a consumer group rebalance occurs (when a consumer starts or stops for example). This happens because the Kafka3ConsumerService.onPartitionsRevoked() callback only performs a rollback instead of committing pending offsets before partitions are revoked.

When a rebalance is triggered:

  • Kafka calls onPartitionsRevoked() before the consumer loses ownership of its partitions
  • The current implementation just rolls back to the last committed offset
  • Any messages that were polled but not yet committed are lost
  • The processor later attempts to commit offsets but receives RebalanceInProgressException
  • Another consumer re-processes the same messages, causing duplicates

This issue did not occur with the legacy ConsumeKafka_2_6 processor in NiFi 1.x.

Root Cause

In NiFi 1.x, the ConsumerLease class implemented ConsumerRebalanceListener and had direct access to both the uncommitted offsets (tracked internally) and the ProcessSession. When onPartitionsRevoked() was called, it would commit pending offsets before partitions were revoked.

For reference:

In NiFi 2.x, the architecture changed:

  • Kafka3ConsumerService handles Kafka consumer operations and implements ConsumerRebalanceListener
  • ConsumeKafka processor handles session/FlowFile operations and offset tracking (via OffsetTracker)
  • There was no mechanism for Kafka3ConsumerService to commit pending offsets during onPartitionsRevoked() because it didn't track them

Solution

This PR restores the NiFi 1.x behavior by tracking uncommitted offsets internally within Kafka3ConsumerService:

  • Added a ConcurrentHashMap<TopicPartition, Long> to track the maximum offset for each partition as records are polled
  • Modified poll() to update the tracked offsets for each record consumed
  • Modified onPartitionsRevoked() to commit the tracked offsets for revoked partitions before they are taken away
  • Modified commit() to clear tracked offsets for partitions that have been committed
  • Modified rollback() to clear tracked offsets for partitions being rolled back

This approach mirrors what was done in ConsumerLease.uncommittedOffsetsMap in NiFi 1.x.

Important Note

While this fix restores correct behavior, a cleaner architectural approach would be to introduce a callback mechanism where the processor can be notified during onPartitionsRevoked() and provide the offsets to commit. This would:

  • Maintain proper separation of concerns between the service and processor layers
  • Preserve the correct ordering of session commit before Kafka offset commit
  • Give the processor full control over what gets committed during rebalance

This larger refactor could be done as a follow-up iteration if desired. This would increase the coupling between processors and the service layer so not sure this is an awesome idea.

Testing

I spent some time adding integration testing and it works locally. Depending on timing and such, it may reveal being flaky... If that's the case, we can drop the new IT class completely but let's see...

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000
  • Pull request contains commits signed with a registered key indicating Verified status

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@pvillard31 pvillard31 added the bug label Feb 4, 2026
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing this issue @pvillard31. The general approach of committing offsets on partitions revoked looks good, but I highlighted a key implementation concern on storing the last offsets. Aside from that, I noted a couple minor recommendations on the tests.

}

// Track the maximum offset for each partition to commit during rebalance
for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach requires iterating over all Records multiple times, once here, and again in actual usage in RecordIterable. The RecordIterable is designed for optimal iteration and client usage. Instead of this loop, I recommend adjusting RecordIterable to track the offset retrieved. This should also provide the opportunity to get the list of TopicPartitions once, and avoid creating a new instance for every Record.

@pvillard31
Copy link
Contributor Author

Thanks for the review @exceptionfactory - I pushed a commit to address your comments

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @pvillard31, I noted an additional recommendation regarding TopicPartition object creation.

@markap14
Copy link
Contributor

markap14 commented Feb 5, 2026

Hey @pvillard31 looking through this, I'm not 100% sure, because this Processor/CS are a bit complex. But I think the approach here could lead to data loss. Specifically, it is crucial that we do NOT commit any offsets to Kafka until after the Processor has called ProcessSesison.commitAsync AND the callback is called. Otherwise, it is possible that the following series of events occurs:

  • Processor receives events but hasn't written them out
  • Rebalance occurs and the offsets get committed
  • Processor fails to serialize them because Content Repo is out of disk space or something like that
  • Now we've already committed the offsets and the data is lost, we can't replay.

Or, similarly, we write the records just fine but session commit fails:

  • Processor receives events and writes them to the FlowFile
  • Rebalance occurs and the offsets get committed
  • Processor commits session but session commit fails due to FlowFile Repository out of disk space
  • Now we've already committed the offsets and the data is lost, we can't replay.

Or, a restart / process dies:

  • Processor receives events and writes them to the FlowFile
  • Rebalance occurs and the offsets get committed
  • Process is killed before session commit happens
  • Now we've already committed the offsets and the data is lost, we can't replay.

You did note above in the Important Note that it may make sense to add some more coupling here where the Processor is made known of a rebalance, but I think it is absolutely required in order to commit the offsets on rebalance. I think we could avoid the callback mechanism, though, as that is definitely messy. Rather, the Processor is already polling the service. We could have the poll() get interrupted in the case of a rebalance, and we could have the Processor always checking before a call to poll() whether or not a rebalance has occurred. If so, it would be responsible for committing the session and only then, upon successful session commit, it could trigger the Kafka Offset commit.

Please let me know if I'm missing something here, but I think this approach basically trades the potential of duplicates in favor of the potential for data loss, and we always want to prefer duplicates over data loss.

@pvillard31
Copy link
Contributor Author

Thanks for the review @markap14 - I implemented an integration test that confirms your data loss scenario. Moving this PR to draft as I'm working on the other discussed approach.

@pvillard31 pvillard31 marked this pull request as draft February 5, 2026 18:45
@pvillard31 pvillard31 marked this pull request as ready for review February 5, 2026 22:45
…ions

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
- Different approach to make sure processor commits session
Copy link
Contributor

@markap14 markap14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pvillard31 I think the latest commit addresses the concern that I have. I don't see any patterns at this time that would introduce data loss or anything like that. Thanks for fixing! Will leave up to @exceptionfactory to give a final approval since he currently has the PR marked with 'Request changes'.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants