[Backport 7.79.x] [kafka_consumer] Use AdminClient.list_offsets for earliest fetch and isolate its failures#23665
Merged
Merged
Conversation
…isolate its failures (#23580) * [kafka_consumer] Use AdminClient.list_offsets for earliest fetch and isolate its failures Replace the LOW_WATERMARK fetch in `_collect_topic_metadata` (which went through `Consumer.offsets_for_times(timestamp=0)` and forced the broker to walk `.timeindex` segment files) with `AdminClient.list_offsets` and `OffsetSpec.earliest()`. The broker services this from in-memory `logStartOffset` instead, eliminating the time-index scan that was timing out for clusters with many segments x multi-broker fan-out. Wrap the call so a failure no longer aborts the entire topic-metadata collection. When the earliest fetch fails (or returns errors per partition), only the metrics that genuinely depend on it are skipped: `partition.beginning_offset`, `partition.size`, and `topic.size`. Everything else - `topic.message_rate`, `topic.partitions`, `partition.isr/replicas/under_replicated/offline`, `topic.config.*`, and all consumer-side metrics - keeps emitting. Verified locally on a 20k-topic / 40k-partition / 20k-consumer-group cluster: with the failure simulated, sample volume drops from 480k to 380k (only the 100k earliest-dependent samples lost) instead of the prior 300k+ loss when the whole `_collect_topic_metadata` aborted. * Use actual PR number for changelog file * Drop dead lowwater code path and rename get_watermark_offsets -> get_highwater_offsets (cherry picked from commit 57b7286)
Contributor
Author
Validation Report
Run Passed validations (19)
|
Codecov Report❌ Patch coverage is Additional details and impacted files🚀 New features to boost your workflow:
|
Contributor
🎉 All green!❄️ No new flaky tests detected 🎯 Code Coverage (details) 🔗 Commit SHA: 7f7ea2f | Docs | Datadog PR Page | Give us feedback! |
sarah-witt
approved these changes
May 12, 2026
Contributor
sarah-witt
left a comment
There was a problem hiding this comment.
Approving the backport, but could you add a test for this case in a future PR?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Backport 57b7286 from #23580.
What does this PR do?
Two changes to
_collect_topic_metadatain the kafka_consumer cluster monitoring path:Switch the earliest-offset fetch to
AdminClient.list_offsets(OffsetSpec.earliest()). The previous path went throughConsumer.offsets_for_times(timestamp=0), which the broker services by walking.timeindexsegment files. The new path uses theEARLIEST_TIMESTAMPsentinel (-2), which the broker returns directly from in-memorylogStartOffset.Isolate the call's failures. A new
_fetch_earliest_offsetshelper catches request-level and per-partition failures and returns{}(or a partial map) instead of propagating. The per-partition loop now treats a missing earliest as "skip just the earliest-dependent metrics for this partition" rather than aborting the whole topic-metadata collection.Motivation
Customer flare showed:
The
offsets_for_times(ts=0)call timed out (large cluster, many segments per partition, many broker fan-out). Because the call sits at the top of_collect_topic_metadata, the entire function aborted, droppingtopic.message_rate,topic.partitions,partition.isr/replicas/under_replicated/offline,topic.config.*, andtopic.size— none of which structurally depend on earliest offsets.Verification
Reproduced on a local 20k-topic / 40k-partition / 20k-consumer-group cluster with the timeout simulated:
topic.message_ratetopic.sizepartition.sizeAfter the fix, only the 3 metrics that genuinely require earliest offsets are skipped on failure; everything else keeps emitting.
Review checklist (to be filled by the reviewer)
ddev release changelog new)🤖 Generated with Claude Code