feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747
Draft
shashank-reddy-nr wants to merge 3 commits into
Draft
feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747shashank-reddy-nr wants to merge 3 commits into
shashank-reddy-nr wants to merge 3 commits into
Conversation
Adds per-cluster, per-topic produce and consume metrics that uniquely identify
Kafka clusters by their UUID (cluster ID). These complement the existing per-node
MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses
of the same cluster into a single metric, enabling cluster-level throughput
analysis across MSK, Confluent Cloud, and self-hosted Kafka.
Metric format:
MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce
MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume
The cluster ID is fetched automatically using the client's own authenticated
connection — no extra configuration or credentials needed.
Also includes:
- Unit and integration tests for all new code paths
- Bug fixes identified in code review (volatile fields, thread-safety,
per-message vs per-poll counting, auth config passthrough)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1747 +/- ##
==========================================
- Coverage 82.08% 81.93% -0.16%
==========================================
Files 215 215
Lines 26309 26460 +151
Branches 4150 4178 +28
==========================================
+ Hits 21596 21680 +84
- Misses 3301 3348 +47
- Partials 1412 1432 +20 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with simple string extraction for newrelic header parsing — no external dependency - Node.js: Fix lint violations — empty catch blocks now log debug messages, extract injectHeaders() helper to reduce cognitive complexity, remove unused variables from test, add JSDoc @param description - Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix race condition where daemon thread overwrote seeded test fixture values; reactive fetch on first consumed message is sufficient Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The seeded_cluster_id fixture pre-seeds _kafka_cluster_id_cache to make the cluster-ID tests deterministic, but constructing a producer/consumer spawns a daemon thread (_fetch_kafka_cluster_id) that calls describe_cluster() against the test broker. That returns no cluster_id, so the thread pops the cache key and deletes the seeded value. The GIL hid this race; on free-threaded builds the thread runs in parallel and clobbers the seed, failing test_cluster_produce_metric and test_cluster_id_attribute_on_transaction for the later parametrizations. Add an autouse fixture that no-ops the async fetch for the integration tests so the seeded value is the only writer. The real fetch remains covered by test_cluster_metrics_unit.py, which the fixture skips. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Adds per-cluster, per-topic metrics for both kafka-python-ng and confluent-kafka.
kafka-python-ng (messagebroker_kafkapython.py)
thread with double-checked-locking + sentinel ("") to prevent duplicate spawns.
credentials inherited, consumer-only keys excluded to prevent TypeError.
init (symmetric with producer).
wrap_kafkaconsumer_next using _cache_key (not
key) — no message routingkey corruption.
confluent-kafka (messagebroker_confluentkafka.py)
_nr_bootstrap_servers before fetch for SerializingProducer and
DeserializingConsumer (was missing, caused uncached thread spawns).
Tests
allowlist, mock target uses kafka.KafkaAdminClient.
NR staging (account 11833718).
Before contributing, please read our contributing guidelines and code of conduct.
Overview
Describe the changes present in the pull request
Related Github Issue
Include a link to the related GitHub issue, if applicable
Testing
The agent includes a suite of tests which should be used to
verify your changes don't break existing functionality. These tests will run with
Github Actions when a pull request is made. More details on running the tests locally can be found in our
testing guidelines,
For most contributions it is strongly recommended to add additional tests which
exercise your changes.