Skip to content

feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747

Draft
shashank-reddy-nr wants to merge 3 commits into
newrelic:mainfrom
shashank-reddy-nr:feat/kafka-cluster-id
Draft

feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747
shashank-reddy-nr wants to merge 3 commits into
newrelic:mainfrom
shashank-reddy-nr:feat/kafka-cluster-id

Conversation

@shashank-reddy-nr
Copy link
Copy Markdown

What

Adds per-cluster, per-topic metrics for both kafka-python-ng and confluent-kafka.

kafka-python-ng (messagebroker_kafkapython.py)

  • _fetch_kafka_cluster_id(): uses KafkaAdminClient.describe_cluster() in a daemon
    thread with double-checked-locking + sentinel ("") to prevent duplicate spawns.
  • Full config passed via _ADMIN_CLIENT_ALLOWED_KEYS allowlist — SASL/SSL
    credentials inherited, consumer-only keys excluded to prevent TypeError.
  • wrap_KafkaConsumer_init_cluster: proactive cluster ID fetch on KafkaConsumer
    init (symmetric with producer).
  • Cluster metric recorded in both wrap_KafkaProducer_send and
    wrap_kafkaconsumer_next using _cache_key (not key) — no message routing
    key corruption.

confluent-kafka (messagebroker_confluentkafka.py)

  • _fetch_cluster_id() already uses instance.list_topics() — extended to set
    _nr_bootstrap_servers before fetch for SerializingProducer and
    DeserializingConsumer (was missing, caused uncached thread spawns).

Tests

  • Unit: test_cluster_metrics_unit.py — key preservation regression, auth config
    allowlist, mock target uses kafka.KafkaAdminClient.
  • Integration: kafka-python-producer and kafka-python-consumer validated in
    NR staging (account 11833718).

Before contributing, please read our contributing guidelines and code of conduct.

Overview

Describe the changes present in the pull request

Related Github Issue

Include a link to the related GitHub issue, if applicable

Testing

The agent includes a suite of tests which should be used to
verify your changes don't break existing functionality. These tests will run with
Github Actions when a pull request is made. More details on running the tests locally can be found in our
testing guidelines,
For most contributions it is strongly recommended to add additional tests which
exercise your changes.

Adds per-cluster, per-topic produce and consume metrics that uniquely identify
Kafka clusters by their UUID (cluster ID). These complement the existing per-node
MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses
of the same cluster into a single metric, enabling cluster-level throughput
analysis across MSK, Confluent Cloud, and self-hosted Kafka.

Metric format:
  MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce
  MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume

The cluster ID is fetched automatically using the client's own authenticated
connection — no extra configuration or credentials needed.

Also includes:
- Unit and integration tests for all new code paths
- Bug fixes identified in code review (volatile fields, thread-safety,
  per-message vs per-poll counting, auth config passthrough)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@shashank-reddy-nr shashank-reddy-nr requested a review from a team as a code owner June 1, 2026 10:44
@shashank-reddy-nr shashank-reddy-nr marked this pull request as draft June 1, 2026 10:52
@mergify mergify Bot added the tests-failing Tests failing in CI. label Jun 1, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 1, 2026

Codecov Report

❌ Patch coverage is 67.39130% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.93%. Comparing base (949339a) to head (ae44208).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
newrelic/hooks/messagebroker_kafkapython.py 63.52% 21 Missing and 10 partials ⚠️
newrelic/hooks/messagebroker_confluentkafka.py 73.07% 8 Missing and 6 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1747      +/-   ##
==========================================
- Coverage   82.08%   81.93%   -0.16%     
==========================================
  Files         215      215              
  Lines       26309    26460     +151     
  Branches     4150     4178      +28     
==========================================
+ Hits        21596    21680      +84     
- Misses       3301     3348      +47     
- Partials     1412     1432      +20     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

shashank-reddy-nr and others added 2 commits June 1, 2026 17:51
- .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with
  simple string extraction for newrelic header parsing — no external dependency
- Node.js: Fix lint violations — empty catch blocks now log debug messages,
  extract injectHeaders() helper to reduce cognitive complexity, remove unused
  variables from test, add JSDoc @param description
- Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix
  race condition where daemon thread overwrote seeded test fixture values;
  reactive fetch on first consumed message is sufficient

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The seeded_cluster_id fixture pre-seeds _kafka_cluster_id_cache to make the
cluster-ID tests deterministic, but constructing a producer/consumer spawns a
daemon thread (_fetch_kafka_cluster_id) that calls describe_cluster() against the
test broker. That returns no cluster_id, so the thread pops the cache key and
deletes the seeded value. The GIL hid this race; on free-threaded builds the
thread runs in parallel and clobbers the seed, failing test_cluster_produce_metric
and test_cluster_id_attribute_on_transaction for the later parametrizations.

Add an autouse fixture that no-ops the async fetch for the integration tests so the
seeded value is the only writer. The real fetch remains covered by
test_cluster_metrics_unit.py, which the fixture skips.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@mergify mergify Bot removed the tests-failing Tests failing in CI. label Jun 1, 2026
@hmstepanek hmstepanek self-requested a review June 1, 2026 22:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants