NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions#10769
NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions#10769pvillard31 wants to merge 5 commits intoapache:mainfrom
Conversation
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for addressing this issue @pvillard31. The general approach of committing offsets on partitions revoked looks good, but I highlighted a key implementation concern on storing the last offsets. Aside from that, I noted a couple minor recommendations on the tests.
| } | ||
|
|
||
| // Track the maximum offset for each partition to commit during rebalance | ||
| for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) { |
There was a problem hiding this comment.
This approach requires iterating over all Records multiple times, once here, and again in actual usage in RecordIterable. The RecordIterable is designed for optimal iteration and client usage. Instead of this loop, I recommend adjusting RecordIterable to track the offset retrieved. This should also provide the opportunity to get the list of TopicPartitions once, and avoid creating a new instance for every Record.
...ka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
Show resolved
Hide resolved
...ka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
Outdated
Show resolved
Hide resolved
...ka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
Outdated
Show resolved
Hide resolved
...e-shared/src/test/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerServiceTest.java
Outdated
Show resolved
Hide resolved
5e4adef to
5af4163
Compare
|
Thanks for the review @exceptionfactory - I pushed a commit to address your comments |
exceptionfactory
left a comment
There was a problem hiding this comment.
Thanks for the updates @pvillard31, I noted an additional recommendation regarding TopicPartition object creation.
...rvice-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
Outdated
Show resolved
Hide resolved
|
Hey @pvillard31 looking through this, I'm not 100% sure, because this Processor/CS are a bit complex. But I think the approach here could lead to data loss. Specifically, it is crucial that we do NOT commit any offsets to Kafka until after the Processor has called
Or, similarly, we write the records just fine but session commit fails:
Or, a restart / process dies:
You did note above in the Important Note that it may make sense to add some more coupling here where the Processor is made known of a rebalance, but I think it is absolutely required in order to commit the offsets on rebalance. I think we could avoid the callback mechanism, though, as that is definitely messy. Rather, the Processor is already polling the service. We could have the poll() get interrupted in the case of a rebalance, and we could have the Processor always checking before a call to Please let me know if I'm missing something here, but I think this approach basically trades the potential of duplicates in favor of the potential for data loss, and we always want to prefer duplicates over data loss. |
|
Thanks for the review @markap14 - I implemented an integration test that confirms your data loss scenario. Moving this PR to draft as I'm working on the other discussed approach. |
b184bdd to
09a4e81
Compare
…ions Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
- Different approach to make sure processor commits session
dbef90f to
4fcac99
Compare
markap14
left a comment
There was a problem hiding this comment.
Thanks @pvillard31 I think the latest commit addresses the concern that I have. I don't see any patterns at this time that would introduce data loss or anything like that. Thanks for fixing! Will leave up to @exceptionfactory to give a final approval since he currently has the PR marked with 'Request changes'.
Summary
NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions
Problem
The
ConsumeKafkaprocessor usingKafka3ConnectionServicecauses duplicate message processing when a consumer group rebalance occurs (when a consumer starts or stops for example). This happens because theKafka3ConsumerService.onPartitionsRevoked()callback only performs a rollback instead of committing pending offsets before partitions are revoked.When a rebalance is triggered:
onPartitionsRevoked()before the consumer loses ownership of its partitionsRebalanceInProgressExceptionThis issue did not occur with the legacy
ConsumeKafka_2_6processor in NiFi 1.x.Root Cause
In NiFi 1.x, the
ConsumerLeaseclass implementedConsumerRebalanceListenerand had direct access to both the uncommitted offsets (tracked internally) and theProcessSession. WhenonPartitionsRevoked()was called, it would commit pending offsets before partitions were revoked.For reference:
In NiFi 2.x, the architecture changed:
Kafka3ConsumerServicehandles Kafka consumer operations and implementsConsumerRebalanceListenerConsumeKafkaprocessor handles session/FlowFile operations and offset tracking (viaOffsetTracker)Kafka3ConsumerServiceto commit pending offsets duringonPartitionsRevoked()because it didn't track themSolution
This PR restores the NiFi 1.x behavior by tracking uncommitted offsets internally within
Kafka3ConsumerService:ConcurrentHashMap<TopicPartition, Long>to track the maximum offset for each partition as records are polledpoll()to update the tracked offsets for each record consumedonPartitionsRevoked()to commit the tracked offsets for revoked partitions before they are taken awaycommit()to clear tracked offsets for partitions that have been committedrollback()to clear tracked offsets for partitions being rolled backThis approach mirrors what was done in
ConsumerLease.uncommittedOffsetsMapin NiFi 1.x.Important Note
While this fix restores correct behavior, a cleaner architectural approach would be to introduce a callback mechanism where the processor can be notified during
onPartitionsRevoked()and provide the offsets to commit. This would:This larger refactor could be done as a follow-up iteration if desired. This would increase the coupling between processors and the service layer so not sure this is an awesome idea.
Testing
I spent some time adding integration testing and it works locally. Depending on timing and such, it may reveal being flaky... If that's the case, we can drop the new IT class completely but let's see...
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000VerifiedstatusPull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation