-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions #10769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
360 changes: 360 additions & 0 deletions
360
...3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,360 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.nifi.kafka.processors; | ||
|
|
||
| import org.apache.kafka.clients.admin.Admin; | ||
| import org.apache.kafka.clients.admin.AdminClientConfig; | ||
| import org.apache.kafka.clients.admin.NewTopic; | ||
| import org.apache.kafka.clients.admin.TopicDescription; | ||
| import org.apache.kafka.clients.consumer.ConsumerConfig; | ||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
| import org.apache.kafka.clients.producer.KafkaProducer; | ||
| import org.apache.kafka.clients.producer.ProducerConfig; | ||
| import org.apache.kafka.clients.producer.ProducerRecord; | ||
| import org.apache.kafka.common.TopicPartition; | ||
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; | ||
| import org.apache.kafka.common.serialization.StringSerializer; | ||
| import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; | ||
| import org.apache.nifi.kafka.service.api.record.ByteRecord; | ||
| import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; | ||
| import org.apache.nifi.kafka.service.consumer.Subscription; | ||
| import org.apache.nifi.logging.ComponentLog; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.Timeout; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
| import static org.mockito.Mockito.mock; | ||
|
|
||
| /** | ||
| * Integration tests for verifying that ConsumeKafka correctly handles consumer group rebalances | ||
| * without causing duplicate message processing. | ||
| */ | ||
| class ConsumeKafkaRebalanceIT extends AbstractConsumeKafkaIT { | ||
|
|
||
| private static final int NUM_PARTITIONS = 3; | ||
| private static final int MESSAGES_PER_PARTITION = 20; | ||
|
|
||
| /** | ||
| * Tests that when onPartitionsRevoked is called (simulating rebalance), the consumer | ||
| * correctly commits offsets, and a subsequent consumer in the same group doesn't | ||
| * re-consume the same messages (no duplicates). | ||
| * | ||
| * This test: | ||
| * 1. Produces messages to a multi-partition topic | ||
| * 2. Consumer 1 polls and processes messages | ||
| * 3. Simulates rebalance by calling onPartitionsRevoked on Consumer 1 | ||
| * 4. Consumer 2 joins and continues consuming from committed offsets | ||
| * 5. Verifies no duplicate messages were consumed | ||
| */ | ||
| @Test | ||
| @Timeout(value = 60, unit = TimeUnit.SECONDS) | ||
| void testRebalanceDoesNotCauseDuplicates() throws Exception { | ||
| final String topic = "rebalance-test-" + UUID.randomUUID(); | ||
| final String groupId = "rebalance-group-" + UUID.randomUUID(); | ||
| final int totalMessages = NUM_PARTITIONS * MESSAGES_PER_PARTITION; | ||
|
|
||
| createTopic(topic, NUM_PARTITIONS); | ||
| produceMessagesToTopic(topic, NUM_PARTITIONS, MESSAGES_PER_PARTITION); | ||
|
|
||
| final Set<String> consumedMessages = new HashSet<>(); | ||
| final AtomicInteger duplicateCount = new AtomicInteger(0); | ||
| final ComponentLog mockLog = mock(ComponentLog.class); | ||
|
|
||
| final Properties props1 = getConsumerProperties(groupId); | ||
| try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new KafkaConsumer<>(props1)) { | ||
| final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST); | ||
| final Kafka3ConsumerService service1 = new Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription); | ||
|
|
||
| int consumer1Count = 0; | ||
| int maxAttempts = 20; | ||
| while (consumer1Count < totalMessages / 2 && maxAttempts-- > 0) { | ||
| for (ByteRecord record : service1.poll(Duration.ofSeconds(2))) { | ||
| final String messageId = record.getTopic() + "-" + record.getPartition() + "-" + record.getOffset(); | ||
| if (!consumedMessages.add(messageId)) { | ||
| duplicateCount.incrementAndGet(); | ||
| } | ||
| consumer1Count++; | ||
| } | ||
| } | ||
|
|
||
| final Set<TopicPartition> assignment = kafkaConsumer1.assignment(); | ||
| service1.onPartitionsRevoked(assignment); | ||
| // Simulate processor committing offsets after successful session commit | ||
| service1.commitOffsetsForRevokedPartitions(); | ||
| service1.close(); | ||
| } | ||
|
|
||
| final Properties props2 = getConsumerProperties(groupId); | ||
| try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new KafkaConsumer<>(props2)) { | ||
| final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST); | ||
| final Kafka3ConsumerService service2 = new Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription); | ||
|
|
||
| int emptyPolls = 0; | ||
| while (emptyPolls < 5 && consumedMessages.size() < totalMessages) { | ||
| boolean hasRecords = false; | ||
| for (ByteRecord record : service2.poll(Duration.ofSeconds(2))) { | ||
| hasRecords = true; | ||
| final String messageId = record.getTopic() + "-" + record.getPartition() + "-" + record.getOffset(); | ||
| if (!consumedMessages.add(messageId)) { | ||
| duplicateCount.incrementAndGet(); | ||
| } | ||
| } | ||
| if (!hasRecords) { | ||
| emptyPolls++; | ||
| } else { | ||
| emptyPolls = 0; | ||
| } | ||
| } | ||
|
|
||
| service2.close(); | ||
| } | ||
|
|
||
| assertEquals(0, duplicateCount.get(), | ||
| "Expected no duplicate messages but found " + duplicateCount.get()); | ||
| assertEquals(totalMessages, consumedMessages.size(), | ||
| "Expected to consume " + totalMessages + " unique messages but got " + consumedMessages.size()); | ||
| } | ||
|
|
||
| /** | ||
| * Tests that offsets can be committed after rebalance when processor calls commitOffsetsForRevokedPartitions. | ||
| * | ||
| * This test: | ||
| * 1. Creates a consumer and polls messages | ||
| * 2. Manually invokes onPartitionsRevoked (simulating what Kafka does during rebalance) | ||
| * 3. Calls commitOffsetsForRevokedPartitions (simulating processor committing after session commit) | ||
| * 4. Verifies that offsets were committed to Kafka | ||
| */ | ||
| @Test | ||
| @Timeout(value = 60, unit = TimeUnit.SECONDS) | ||
| void testOffsetsCommittedDuringRebalance() throws Exception { | ||
| final String topic = "rebalance-offset-test-" + UUID.randomUUID(); | ||
| final String groupId = "rebalance-offset-group-" + UUID.randomUUID(); | ||
| final int messagesPerPartition = 10; | ||
|
|
||
| createTopic(topic, NUM_PARTITIONS); | ||
| produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition); | ||
|
|
||
| final ComponentLog mockLog = mock(ComponentLog.class); | ||
| final Properties props = getConsumerProperties(groupId); | ||
|
|
||
| try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props)) { | ||
| final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST); | ||
| final Kafka3ConsumerService service = new Kafka3ConsumerService(mockLog, kafkaConsumer, subscription); | ||
|
|
||
| int polledCount = 0; | ||
| int maxAttempts = 20; | ||
| while (polledCount < 15 && maxAttempts-- > 0) { | ||
| for (ByteRecord ignored : service.poll(Duration.ofSeconds(2))) { | ||
| polledCount++; | ||
| } | ||
| } | ||
|
|
||
| assertTrue(polledCount > 0, "Should have polled at least some messages"); | ||
|
|
||
| final Set<TopicPartition> assignment = kafkaConsumer.assignment(); | ||
| assertFalse(assignment.isEmpty(), "Consumer should have partition assignments"); | ||
|
|
||
| service.onPartitionsRevoked(assignment); | ||
| // Simulate processor committing offsets after successful session commit | ||
| service.commitOffsetsForRevokedPartitions(); | ||
| service.close(); | ||
| } | ||
|
|
||
| try (KafkaConsumer<byte[], byte[]> verifyConsumer = new KafkaConsumer<>(getConsumerProperties(groupId))) { | ||
| final Set<TopicPartition> partitions = new HashSet<>(); | ||
| for (int i = 0; i < NUM_PARTITIONS; i++) { | ||
| partitions.add(new TopicPartition(topic, i)); | ||
| } | ||
|
|
||
| final Map<TopicPartition, OffsetAndMetadata> committedOffsets = verifyConsumer.committed(partitions); | ||
|
|
||
| long totalCommitted = committedOffsets.values().stream() | ||
| .filter(o -> o != null) | ||
| .mapToLong(OffsetAndMetadata::offset) | ||
| .sum(); | ||
|
|
||
| assertTrue(totalCommitted > 0, | ||
| "Expected offsets to be committed after commitOffsetsForRevokedPartitions, but total committed offset was " + totalCommitted); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Tests that records are NOT lost when a rebalance occurs before processing is complete. | ||
| * | ||
| * This test simulates the scenario where: | ||
| * 1. Consumer polls and iterates through records (tracking offsets internally) | ||
| * 2. Rebalance occurs (onPartitionsRevoked called) BEFORE the processor commits its session | ||
| * 3. Consumer "fails" (simulating crash or processing failure) without committing offsets | ||
| * 4. New consumer joins with the same group | ||
| * 5. The new consumer receives the same records since they were never successfully processed | ||
| */ | ||
| @Test | ||
| @Timeout(value = 60, unit = TimeUnit.SECONDS) | ||
| void testNoDataLossWhenRebalanceOccursBeforeProcessingComplete() throws Exception { | ||
| final String topic = "dataloss-test-" + UUID.randomUUID(); | ||
| final String groupId = "dataloss-group-" + UUID.randomUUID(); | ||
| final int messagesPerPartition = 10; | ||
| final int totalMessages = NUM_PARTITIONS * messagesPerPartition; | ||
|
|
||
| createTopic(topic, NUM_PARTITIONS); | ||
| produceMessagesToTopic(topic, NUM_PARTITIONS, messagesPerPartition); | ||
|
|
||
| final ComponentLog mockLog = mock(ComponentLog.class); | ||
| int recordsPolledByFirstConsumer = 0; | ||
|
|
||
| // Consumer 1: Poll and iterate records, then rebalance occurs, but processing "fails" | ||
| final Properties props1 = getConsumerProperties(groupId); | ||
| try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new KafkaConsumer<>(props1)) { | ||
| final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST); | ||
| final Kafka3ConsumerService service1 = new Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription); | ||
|
|
||
| // Poll and iterate through records - this tracks offsets internally | ||
| int maxAttempts = 20; | ||
| while (recordsPolledByFirstConsumer < totalMessages && maxAttempts-- > 0) { | ||
| for (ByteRecord ignored : service1.poll(Duration.ofSeconds(2))) { | ||
| recordsPolledByFirstConsumer++; | ||
| } | ||
| } | ||
|
|
||
| assertTrue(recordsPolledByFirstConsumer > 0, "First consumer should have polled some records"); | ||
|
|
||
| // Simulate rebalance occurring before processor commits its session | ||
| final Set<TopicPartition> assignment = kafkaConsumer1.assignment(); | ||
| assertFalse(assignment.isEmpty(), "Consumer should have partition assignments"); | ||
| service1.onPartitionsRevoked(assignment); | ||
|
|
||
| // DO NOT call any "commit" or "process" method - simulating that the processor | ||
| // never completed processing (e.g., session commit failed, process crashed, etc.) | ||
|
|
||
| service1.close(); | ||
| } | ||
|
|
||
| // Consumer 2: Should receive the SAME records because processing was never completed | ||
| int recordsPolledBySecondConsumer = 0; | ||
| final Properties props2 = getConsumerProperties(groupId); | ||
| try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new KafkaConsumer<>(props2)) { | ||
| final Subscription subscription = new Subscription(groupId, Collections.singletonList(topic), AutoOffsetReset.EARLIEST); | ||
| final Kafka3ConsumerService service2 = new Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription); | ||
|
|
||
| // Poll for records - if no data loss, we should get the same records again | ||
| int emptyPolls = 0; | ||
| while (emptyPolls < 5) { | ||
| boolean hasRecords = false; | ||
| for (ByteRecord ignored : service2.poll(Duration.ofSeconds(2))) { | ||
| hasRecords = true; | ||
| recordsPolledBySecondConsumer++; | ||
| } | ||
| if (!hasRecords) { | ||
| emptyPolls++; | ||
| } else { | ||
| emptyPolls = 0; | ||
| } | ||
| } | ||
|
|
||
| service2.close(); | ||
| } | ||
|
|
||
| // Records should NOT be lost - the second consumer should receive | ||
| // at least the records that were polled by the first consumer but never processed | ||
| assertTrue(recordsPolledBySecondConsumer >= recordsPolledByFirstConsumer, | ||
| "Data loss detected! First consumer polled " + recordsPolledByFirstConsumer + | ||
| " records but second consumer only received " + recordsPolledBySecondConsumer + | ||
| " records. Expected second consumer to receive at least " + recordsPolledByFirstConsumer + | ||
| " records since processing was never completed."); | ||
| } | ||
|
|
||
| /** | ||
| * Produces messages to a specific topic with a given number of partitions. | ||
| */ | ||
| private void produceMessagesToTopic(final String topic, final int numPartitions, final int messagesPerPartition) throws Exception { | ||
| final Properties producerProps = new Properties(); | ||
| producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); | ||
| producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
| producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
|
|
||
| try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) { | ||
| for (int partition = 0; partition < numPartitions; partition++) { | ||
| for (int i = 0; i < messagesPerPartition; i++) { | ||
| final String key = "key-" + partition + "-" + i; | ||
| final String value = "value-" + partition + "-" + i; | ||
| producer.send(new ProducerRecord<>(topic, partition, key, value)).get(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void createTopic(final String topic, final int numPartitions) throws Exception { | ||
| final Properties adminProps = new Properties(); | ||
| adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); | ||
|
|
||
| try (Admin admin = Admin.create(adminProps)) { | ||
| final NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1); | ||
| admin.createTopics(Collections.singletonList(newTopic)).all().get(30, TimeUnit.SECONDS); | ||
| waitForTopicReady(admin, topic, numPartitions); | ||
| } | ||
| } | ||
|
|
||
| private void waitForTopicReady(final Admin admin, final String topic, final int expectedPartitions) throws Exception { | ||
| final long startTime = System.currentTimeMillis(); | ||
| final long timeoutMillis = 30000; | ||
|
|
||
| while (System.currentTimeMillis() - startTime < timeoutMillis) { | ||
| try { | ||
| final Map<String, TopicDescription> descriptions = admin.describeTopics(Collections.singletonList(topic)) | ||
| .allTopicNames() | ||
| .get(10, TimeUnit.SECONDS); | ||
| final TopicDescription description = descriptions.get(topic); | ||
| if (description != null && description.partitions().size() == expectedPartitions) { | ||
| return; | ||
| } | ||
| } catch (ExecutionException ignored) { | ||
| // Topic not ready yet, continue polling | ||
| } | ||
| Thread.sleep(100); | ||
| } | ||
| throw new RuntimeException("Topic " + topic + " not ready after " + timeoutMillis + "ms"); | ||
| } | ||
|
|
||
| private Properties getConsumerProperties(final String groupId) { | ||
| final Properties props = new Properties(); | ||
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); | ||
| props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | ||
| props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); | ||
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); | ||
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
| props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); | ||
| props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); | ||
| // Use shorter session timeout to speed up rebalance detection | ||
| props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); | ||
| props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); | ||
| return props; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.