Skip to content

Fix: Handle kafka topic not found exception in fetchPartitionCount fu…#17669

Open
Abhishek01911 wants to merge 16 commits intoapache:masterfrom
Abhishek01911:exception_fix1
Open

Fix: Handle kafka topic not found exception in fetchPartitionCount fu…#17669
Abhishek01911 wants to merge 16 commits intoapache:masterfrom
Abhishek01911:exception_fix1

Conversation

@Abhishek01911
Copy link
Copy Markdown

@Abhishek01911 Abhishek01911 commented Feb 9, 2026

Summary

Resolves Issue #17045

This PR fixes an issue where an exception in fetchPartitionCount, often caused by a deleted topic, would halt ingestion for an entire multi-topic table. By catching this exception, the failure is now isolated. Partitions for the deleted topic will get stuck in a "consuming" state, but ingestion from all other valid topics will continue unaffected.
This change only impacts multi topic tables only when property - skip.missing.topics is set as true.

Testing

Tested on multi Topic table
Validated multi topic table was able to ingest data even after kafka topic was deleted / invalid kafka topic was added

@Jackie-Jiang Jackie-Jiang added kafka Related to Kafka stream connector bugfix real-time Related to realtime table ingestion and serving ingestion Related to data ingestion pipeline labels Feb 9, 2026
Comment thread pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java Outdated
Copy link
Copy Markdown
Contributor

@cbalci cbalci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be best to guard this logic with a stream config like skip.missing.topics=true which is false by default.

public static final String GROUP_ID = "hlc.group.id";
public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = "partition.offset.factory.class.name";
public static final String TOPIC_CONSUMPTION_RATE_LIMIT = "topic.consumption.rate.limit";
public static final String SKIP_MISSING_TOPICS = "skip.missing.topics";
Copy link
Copy Markdown
Contributor

@cbalci cbalci Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a generic property really. Can we create a new section for multitopic related configs and place it there with proper prefix and explanation?
Something like

MULTITOPIC_SKIP_MISSING_TOPICS = "multitopic.skip.missing.topics";

Comment on lines +131 to +132
// Check if the topic exists before fetching partition metadata
// Only perform this check if topic existence validation is enabled and topics were fetched
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we renamed the config (and the variable), please update the comment accordingly.

throws Exception {
int numStreams = _streamConfigs.size();

// Fetch available topics once and reuse across all streams (for topic existence validation)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clarify this comment

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Feb 24, 2026

Codecov Report

❌ Patch coverage is 74.07407% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.21%. Comparing base (a5eae57) to head (8420429).
⚠️ Report is 69 commits behind head on master.

Files with missing lines Patch % Lines
...inot/spi/stream/PartitionGroupMetadataFetcher.java 74.07% 6 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17669      +/-   ##
============================================
- Coverage     63.22%   63.21%   -0.02%     
+ Complexity     1499     1454      -45     
============================================
  Files          3174     3176       +2     
  Lines        190319   191029     +710     
  Branches      29080    29207     +127     
============================================
+ Hits         120338   120764     +426     
- Misses        60643    60859     +216     
- Partials       9338     9406      +68     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.18% <74.07%> (+7.59%) ⬆️
java-21 63.18% <74.07%> (-0.04%) ⬇️
temurin 63.21% <74.07%> (-0.02%) ⬇️
unittests 63.21% <74.07%> (-0.02%) ⬇️
unittests1 55.59% <74.07%> (-0.02%) ⬇️
unittests2 34.06% <0.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@xiangfu0 xiangfu0 removed the bugfix label Mar 20, 2026
@xiangfu0 xiangfu0 added the bug Something is not working as expected label Mar 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is not working as expected ingestion Related to data ingestion pipeline kafka Related to Kafka stream connector real-time Related to realtime table ingestion and serving

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants