[kafka] Initial commit to enable metric ingestion via Kafka input#18266
[kafka] Initial commit to enable metric ingestion via Kafka input#18266adrianchen-es wants to merge 3 commits intoelastic:mainfrom
Conversation
Vale Linting ResultsSummary: 3 suggestions found 💡 Suggestions (3)
The Vale linter checks documentation changes against the Elastic Docs style guide. To use Vale locally or report issues, refer to Elastic style guide for Vale. |
|
Could you also add the recommended pipeline tests as part the PR. ++ @stefans-elastic for reviewing the PR. |
🚀 Benchmarks reportTo see the full report comment with |
|
/test |
⏳ Build in-progress, with failures
Failed CI Steps
History
|
|
I am not 100% clear on the test failures for the integration test for kafka_log - https://buildkite.com/elastic/integrations/builds/41142#019d7164-02ba-44b9-96d7-9e5ed4848465, as a local test succeeds. |
|
/test Check integrations kafka_log |
|
/test stack 9.3.2 |
|
@stefans-elastic, any pointers on the integration test error or other changes that are required to progress this?
|
|
Unfortunately I wasn't able to test it locally (because Root Cause: kafka_log.metrics CI test failure In CI, all datastream tests in a package share the same Kafka broker, while locally each test gets its own fresh broker. The generic test runs first and consumes all messages from Fix: At minimum, change group_id: system_test → group_id: system_test_metrics in data_stream/metrics/_dev/test/system/test-kafka-config.yml. Ideally also add a dedicated kafka-metrics |
@stefans-elastic, that was already done, but the results were the same - c787fbb Both local logs and the CI test logs suggest new containers were created.
I will nevertheless update the consumer group and see if the CI improves. |
|
@stefans-elastic , any insight? The test container is now at the data stream level, the test data is now tailored, there is a different topic and consumer group. I've reduced the |
|
@adrianchen-es not much yet. But I've figured a way to run the tests locally the way they are run in CI (and they indeed fail). I'm trying to troubleshoot. then run the command: |
|
@adrianchen-es Comments on the changes (AI generated):manifest.yml: type: metrics → type: logs The Filebeat kafka input is classified by Fleet as a logs-type input regardless of what the manifest declares. Fleet's processor always injects data_stream.type: logs into every event. manifest.yml: remove source_mode: synthetic and index_mode: time_series index_mode: time_series (TSDB) requires every document to contain all routing dimension fields — missing dimensions are rejected. source_mode: synthetic reconstructs _source from stored fields.yml: remove dimension: true and metric_type: gauge dimension: true is the TSDB routing path annotation — it only makes sense when index_mode: time_series is active. With TSDB removed, the field is just a plain keyword. metric_type: gauge test-kafka_metric-config.yml: wait_for_data_timeout: 30s → 5m On a fresh stack, the test environment needs to: start Kafka, produce messages, have Kibana finish uploading the package (component templates + ingest pipeline), have Fleet push the |
|
Hey @stefans-elastic . If the failure is due to hardcoding in filebeat, it should also fail in my local environment? Given it isn't customised. I'll tweak it aligning to your patch and see what the CI test returns with |
|
This does override the data stream back to logs - logs-kafka_log.metrics-60924. |
| @@ -0,0 +1,289 @@ | |||
| title: Custom Kafka Metrics | |||
| type: logs | |||
There was a problem hiding this comment.
🟢 Low metrics/manifest.yml:2
The manifest declares type: logs on line 2, but this is a metrics data stream (the file path is data_stream/metrics/ and processors at lines 260-274 attempt to patch data_stream.type to metrics). The manifest-level type determines index template selection and data stream routing at the Fleet/Agent level before processors run, so metrics data will be routed to logs-* data streams instead of metrics-*, undermining the PR's goal of enabling custom metric ingestion. Change type: logs to type: metrics so the data stream is correctly routed to metrics-*-* indices.
| type: logs | |
| -type: logs |
🤖 Copy this AI Prompt to have your agent fix this:
In file packages/kafka_log/data_stream/metrics/manifest.yml around line 2:
The manifest declares `type: logs` on line 2, but this is a metrics data stream (the file path is `data_stream/metrics/` and processors at lines 260-274 attempt to patch `data_stream.type` to `metrics`). The manifest-level `type` determines index template selection and data stream routing at the Fleet/Agent level before processors run, so metrics data will be routed to `logs-*` data streams instead of `metrics-*`, undermining the PR's goal of enabling custom metric ingestion. Change `type: logs` to `type: metrics` so the data stream is correctly routed to `metrics-*-*` indices.
Standalone input package (WMI-style layout) for the Filebeat Kafka input, with system tests, Docker fixtures, and field mappings for Kafka metadata. Made-with: Cursor
b99b9a7 to
1d79f04
Compare
| {{#if ssl}} | ||
| ssl: | ||
| {{ssl}} | ||
| {{/if}} |
There was a problem hiding this comment.
🟡 Medium input/input.yml.hbs:105
The ssl field template at line 107 ( {{ssl}}) only indents the first line of multi-line YAML content. When ssl contains the default multi-line mapping from manifest.yml, the rendered output has enabled nested under ssl but certificate and key become top-level keys instead of nested. This produces malformed YAML that the Kafka input fails to parse correctly.
+{{#if ssl}}
+ssl: {{ssl}}
+{{/if}}🤖 Copy this AI Prompt to have your agent fix this:
In file packages/kafka_input/agent/input/input.yml.hbs around lines 105-108:
The `ssl` field template at line 107 (` {{ssl}}`) only indents the first line of multi-line YAML content. When `ssl` contains the default multi-line mapping from manifest.yml, the rendered output has `enabled` nested under `ssl` but `certificate` and `key` become top-level keys instead of nested. This produces malformed YAML that the Kafka input fails to parse correctly.
|
@stefans-elastic , not to waste your time. Is it currently impossible to have metrics as the data_stream.type? |
| {{#if ssl}} | ||
| ssl: {{ssl}} | ||
| {{/if}} |
There was a problem hiding this comment.
🟠 High input/input.yml.hbs:105
Line 106 outputs ssl: {{ssl}} on a single line, but ssl is a multi-line YAML map. This produces invalid YAML like ssl: enabled: false\ncertificate: "/etc/pki/client/cert.pem" when rendered, causing the Kafka input configuration to fail parsing when SSL is configured. The ssl value needs to be nested under ssl: with proper indentation.
{{#if ssl}}
-ssl: {{ssl}}
+ssl:
+ {{ssl}}
{{/if}}🤖 Copy this AI Prompt to have your agent fix this:
In file packages/kafka_input/agent/input/input.yml.hbs around lines 105-107:
Line 106 outputs `ssl: {{ssl}}` on a single line, but `ssl` is a multi-line YAML map. This produces invalid YAML like `ssl: enabled: false\ncertificate: "/etc/pki/client/cert.pem"` when rendered, causing the Kafka input configuration to fail parsing when SSL is configured. The `ssl` value needs to be nested under `ssl:` with proper indentation.
💚 Build Succeeded
History
|
|
@adrianchen-es |
Hey @stefans-elastic, could you help with the question from here?
The aim is to have a method to ingest metrics from the Kafka input. If that is hard-coded and not possible. I will close this (or pause it and look at the package-spec/filebeat). |
|
@adrianchen-es |
@agithomas any chance you could help us clarify this? |
|
@stefans-elastic , @adrianchen-es Please take a look at the references mentioned below
Based on my understanding, this applies to input packages only. Also, to take advantage, the stack must be |
Thank you @agithomas |





Proposed commit message
Enhance the Kafka log integration to allow ingestion of metrics.
The pattern implemented aligns with other integrations that allow both log and metric ingestions.
A dynamic type is not used as it will require existing users to recreate the integration whereas a separate datastream reduces the potential complication and allows existing integrations to enable metrics or switch.
Checklist
changelog.ymlfile.Author's Checklist
How to test this PR locally
Related issues
Closes #18264
Screenshots