Fix: Handle kafka topic not found exception in fetchPartitionCount fu…#17669
Fix: Handle kafka topic not found exception in fetchPartitionCount fu…#17669Abhishek01911 wants to merge 16 commits intoapache:masterfrom
Conversation
cbalci
left a comment
There was a problem hiding this comment.
I think it would be best to guard this logic with a stream config like skip.missing.topics=true which is false by default.
| public static final String GROUP_ID = "hlc.group.id"; | ||
| public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = "partition.offset.factory.class.name"; | ||
| public static final String TOPIC_CONSUMPTION_RATE_LIMIT = "topic.consumption.rate.limit"; | ||
| public static final String SKIP_MISSING_TOPICS = "skip.missing.topics"; |
There was a problem hiding this comment.
This isn't a generic property really. Can we create a new section for multitopic related configs and place it there with proper prefix and explanation?
Something like
MULTITOPIC_SKIP_MISSING_TOPICS = "multitopic.skip.missing.topics";
| // Check if the topic exists before fetching partition metadata | ||
| // Only perform this check if topic existence validation is enabled and topics were fetched |
There was a problem hiding this comment.
Since we renamed the config (and the variable), please update the comment accordingly.
| throws Exception { | ||
| int numStreams = _streamConfigs.size(); | ||
|
|
||
| // Fetch available topics once and reuse across all streams (for topic existence validation) |
There was a problem hiding this comment.
Please clarify this comment
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17669 +/- ##
============================================
- Coverage 63.22% 63.21% -0.02%
+ Complexity 1499 1454 -45
============================================
Files 3174 3176 +2
Lines 190319 191029 +710
Branches 29080 29207 +127
============================================
+ Hits 120338 120764 +426
- Misses 60643 60859 +216
- Partials 9338 9406 +68
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
f082bb6 to
ff8c650
Compare
ff8c650 to
c2ba4eb
Compare
Summary
Resolves Issue #17045
This PR fixes an issue where an exception in fetchPartitionCount, often caused by a deleted topic, would halt ingestion for an entire multi-topic table. By catching this exception, the failure is now isolated. Partitions for the deleted topic will get stuck in a "consuming" state, but ingestion from all other valid topics will continue unaffected.
This change only impacts multi topic tables only when property - skip.missing.topics is set as true.
Testing
Tested on multi Topic table
Validated multi topic table was able to ingest data even after kafka topic was deleted / invalid kafka topic was added