Skip to content

[kafka] Initial commit to enable metric ingestion via Kafka input#18266

Open
adrianchen-es wants to merge 3 commits intoelastic:mainfrom
adrianchen-es:18264-ac-kafka-multi_signal
Open

[kafka] Initial commit to enable metric ingestion via Kafka input#18266
adrianchen-es wants to merge 3 commits intoelastic:mainfrom
adrianchen-es:18264-ac-kafka-multi_signal

Conversation

@adrianchen-es
Copy link
Copy Markdown
Contributor

@adrianchen-es adrianchen-es commented Apr 8, 2026

Proposed commit message

Enhance the Kafka log integration to allow ingestion of metrics.

The pattern implemented aligns with other integrations that allow both log and metric ingestions.
A dynamic type is not used as it will require existing users to recreate the integration whereas a separate datastream reduces the potential complication and allows existing integrations to enable metrics or switch.

Checklist

  • I have reviewed tips for building integrations and this pull request is aligned with them.
  • I have verified that all data streams collect metrics or logs.
  • I have added an entry to my package's changelog.yml file.
  • I have verified that Kibana version constraints are current according to guidelines.
  • I have verified that any added dashboard complies with Kibana's Dashboard good practices

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Closes #18264

Screenshots

@adrianchen-es adrianchen-es added enhancement New feature or request >enhancement labels Apr 8, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

Vale Linting Results

Summary: 3 suggestions found

💡 Suggestions (3)
File Line Rule Message
packages/kafka_input/docs/README.md 13 Elastic.WordChoice Consider using 'refer to if it's a document, view if it's a UI element' instead of 'See', unless the term is in the UI.
packages/kafka_input/docs/README.md 19 Elastic.Ellipses In general, don't use an ellipsis.
packages/kafka_input/docs/README.md 19 Elastic.Ellipses In general, don't use an ellipsis.

The Vale linter checks documentation changes against the Elastic Docs style guide.

To use Vale locally or report issues, refer to Elastic style guide for Vale.

@adrianchen-es adrianchen-es self-assigned this Apr 8, 2026
@adrianchen-es adrianchen-es marked this pull request as ready for review April 8, 2026 04:51
@adrianchen-es adrianchen-es requested a review from a team as a code owner April 8, 2026 04:51
@agithomas
Copy link
Copy Markdown
Contributor

Could you also add the recommended pipeline tests as part the PR.

++ @stefans-elastic for reviewing the PR.

cc @lalit-satapathy

@adrianchen-es adrianchen-es changed the title [kafka log] Initial commit to enable metric ingestion for Kafka [kafka log] Initial commit to enable custom metric ingestion for Kafka Apr 8, 2026
@adrianchen-es adrianchen-es requested a review from a team as a code owner April 8, 2026 09:54
Comment thread packages/kafka_log/_dev/deploy/docker/docker-compose.yml Outdated
@andrewkroh andrewkroh added documentation Improvements or additions to documentation. Applied to PRs that modify *.md files. Integration:kafka_log Custom Kafka Logs Team:Obs-InfraObs Observability Infrastructure Monitoring team [elastic/obs-infraobs-integrations] labels Apr 8, 2026
@elastic-vault-github-plugin-prod
Copy link
Copy Markdown

elastic-vault-github-plugin-prod bot commented Apr 8, 2026

🚀 Benchmarks report

To see the full report comment with /test benchmark fullreport

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test

@elasticmachine
Copy link
Copy Markdown

elasticmachine commented Apr 9, 2026

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

I am not 100% clear on the test failures for the integration test for kafka_log - https://buildkite.com/elastic/integrations/builds/41142#019d7164-02ba-44b9-96d7-9e5ed4848465, as a local test succeeds.
image

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test Check integrations kafka_log

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

/test stack 9.3.2

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@stefans-elastic, any pointers on the integration test error or other changes that are required to progress this?
Just re-ran elastic-package test system -v within the kafka_log package locally, and it passes.

image

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es

Unfortunately I wasn't able to test it locally (because The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested). And I don't see obvious reason for the failure. However I got AI to look at it and here is the result(I hope it is helpful):

Root Cause: kafka_log.metrics CI test failure

In CI, all datastream tests in a package share the same Kafka broker, while locally each test gets its own fresh broker. The generic test runs first and consumes all messages from
testTopic under group_id: system_test, committing the offset to the end of the log. When the metrics test starts with the identical topics: [testTopic] + group_id: system_test, Kafka
returns the committed offset — nothing to read — and the test times out after 642s.

Fix: At minimum, change group_id: system_test → group_id: system_test_metrics in data_stream/metrics/_dev/test/system/test-kafka-config.yml. Ideally also add a dedicated kafka-metrics
docker service writing metric-shaped JSON ({"metric":{"name":"cpu.usage","value":0.42}}) to a separate testTopicMetrics topic to fully isolate the two tests.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@adrianchen-es

Unfortunately I wasn't able to test it locally (because The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested). And I don't see obvious reason for the failure. However I got AI to look at it and here is the result(I hope it is helpful):

Root Cause: kafka_log.metrics CI test failure

In CI, all datastream tests in a package share the same Kafka broker, while locally each test gets its own fresh broker. The generic test runs first and consumes all messages from testTopic under group_id: system_test, committing the offset to the end of the log. When the metrics test starts with the identical topics: [testTopic] + group_id: system_test, Kafka returns the committed offset — nothing to read — and the test times out after 642s.

Fix: At minimum, change group_id: system_test → group_id: system_test_metrics in data_stream/metrics/_dev/test/system/test-kafka-config.yml. Ideally also add a dedicated kafka-metrics docker service writing metric-shaped JSON ({"metric":{"name":"cpu.usage","value":0.42}}) to a separate testTopicMetrics topic to fully isolate the two tests.

@stefans-elastic, that was already done, but the results were the same - c787fbb
I reverted that as it made no material difference (at least for the non-local test).

Both local logs and the CI test logs suggest new containers were created.

image image

I will nevertheless update the consumer group and see if the CI improves.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

adrianchen-es commented Apr 10, 2026

Tried with a different consumer group again with the same CI results.
image

Comment thread packages/kafka_input/manifest.yml
Comment thread packages/kafka_log/data_stream/metrics/_dev/deploy/docker/docker-compose.yml Outdated
@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@stefans-elastic , any insight? The test container is now at the data stream level, the test data is now tailored, there is a different topic and consumer group. I've reduced the wait_for_data_timeout since successes are within 10-15s

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es not much yet. But I've figured a way to run the tests locally the way they are run in CI (and they indeed fail). I'm trying to troubleshoot.
I can share the setup in case you want to reproduce this locally:
file local-test.env:

export YQ_VERSION=v4.35.2
export SETUP_GVM_VERSION=v0.6.0
export JQ_VERSION=1.7
export GH_CLI_VERSION=2.29.0
export KIND_VERSION=v0.27.0
export K8S_VERSION=v1.33.0
export SERVERLESS=false
export STACK_VERSION=8.17.0
export UPLOAD_SAFE_LOGS=0

then run the command:

source local-test.env && .buildkite/scripts/test_one_package.sh kafka_log

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es
I've managed to get it working locally. Here is patch:

diff --git a/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml b/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml
index c428ce9a9b..451896f0c2 100644
--- a/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml
+++ b/packages/kafka_log/data_stream/metrics/_dev/test/system/test-kafka_metric-config.yml
@@ -1,6 +1,6 @@
 service: kafka-svc
 input: kafka
-wait_for_data_timeout: 30s
+wait_for_data_timeout: 5m
 assert:
   min_count: 2
 data_stream:
diff --git a/packages/kafka_log/data_stream/metrics/fields/fields.yml b/packages/kafka_log/data_stream/metrics/fields/fields.yml
index d97c400b0d..1be49ebe18 100644
--- a/packages/kafka_log/data_stream/metrics/fields/fields.yml
+++ b/packages/kafka_log/data_stream/metrics/fields/fields.yml
@@ -6,11 +6,9 @@
   fields:
     - name: name
       type: keyword
-      dimension: true
       description: Metric name extracted from the JSON payload.
     - name: value
       type: double
-      metric_type: gauge
       description: Metric value extracted from the JSON payload.
 - name: kafka.headers
   description: Included Kafka headers
diff --git a/packages/kafka_log/data_stream/metrics/manifest.yml b/packages/kafka_log/data_stream/metrics/manifest.yml
index ab31ed01e0..14ffa3fc3d 100644
--- a/packages/kafka_log/data_stream/metrics/manifest.yml
+++ b/packages/kafka_log/data_stream/metrics/manifest.yml
@@ -1,5 +1,5 @@
 title: Custom Kafka Metrics
-type: metrics
+type: logs
 streams:
   - input: kafka
     description: Collect metric data from Kafka topic with Elastic Agent.
@@ -272,5 +272,3 @@ streams:
 elasticsearch:
   dynamic_dataset: true
   dynamic_namespace: true
-  source_mode: synthetic
-  index_mode: time_series

Comments on the changes (AI generated):

manifest.yml: type: metrics → type: logs

The Filebeat kafka input is classified by Fleet as a logs-type input regardless of what the manifest declares. Fleet's processor always injects data_stream.type: logs into every event.
Meanwhile, elastic-package generates a component template from the manifest's declared type, setting data_stream.type as a constant_keyword with value: metrics. When a document arrives
with type: logs but the index template expects type: metrics, Elasticsearch rejects it with a 400. Because data streams are created lazily on the first successful document, no document
ever lands, the data stream is never created, and the test sees 0 hits. Changing the declared type to logs aligns the component template with what Fleet actually sends.


manifest.yml: remove source_mode: synthetic and index_mode: time_series

index_mode: time_series (TSDB) requires every document to contain all routing dimension fields — missing dimensions are rejected. source_mode: synthetic reconstructs _source from stored
fields rather than storing it verbatim, which requires strict mapping compatibility. Both are advanced modes intended for purpose-built metrics pipelines (counters, gauges with fixed
cardinality). A kafka log input that relays arbitrary JSON messages is not that pipeline — these modes only add rejection surface without benefit.


fields.yml: remove dimension: true and metric_type: gauge

dimension: true is the TSDB routing path annotation — it only makes sense when index_mode: time_series is active. With TSDB removed, the field is just a plain keyword. metric_type: gauge
is an ES mapping parameter that only has meaning (and causes issues) inside a TSDB index. Removing both keeps the field definitions clean and avoids mapping conflicts if the package is
ever installed against a stack that enforces stricter TSDB validation.


test-kafka_metric-config.yml: wait_for_data_timeout: 30s → 5m

On a fresh stack, the test environment needs to: start Kafka, produce messages, have Kibana finish uploading the package (component templates + ingest pipeline), have Fleet push the
policy to the agent, have the agent connect to Kafka and consume, and have Elasticsearch index at least 2 documents. 30 seconds is not enough headroom for all of that, especially on CI
where Kibana's Fleet upload rate-limiter can delay package installation. 5 minutes gives the stack enough time to stabilize before the assertion fires.

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

Hey @stefans-elastic .
Changing it to log type defeats the purpose of the PR? Or would the data stream type remain?

If the failure is due to hardcoding in filebeat, it should also fail in my local environment? Given it isn't customised.

I'll tweak it aligning to your patch and see what the CI test returns with

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

This does override the data stream back to logs - logs-kafka_log.metrics-60924.

Comment thread packages/kafka_log/data_stream/metrics/elasticsearch/ingest_pipeline/default.yml Outdated
@@ -0,0 +1,289 @@
title: Custom Kafka Metrics
type: logs
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low metrics/manifest.yml:2

The manifest declares type: logs on line 2, but this is a metrics data stream (the file path is data_stream/metrics/ and processors at lines 260-274 attempt to patch data_stream.type to metrics). The manifest-level type determines index template selection and data stream routing at the Fleet/Agent level before processors run, so metrics data will be routed to logs-* data streams instead of metrics-*, undermining the PR's goal of enabling custom metric ingestion. Change type: logs to type: metrics so the data stream is correctly routed to metrics-*-* indices.

Suggested change
type: logs
-type: logs
🤖 Copy this AI Prompt to have your agent fix this:
In file packages/kafka_log/data_stream/metrics/manifest.yml around line 2:

The manifest declares `type: logs` on line 2, but this is a metrics data stream (the file path is `data_stream/metrics/` and processors at lines 260-274 attempt to patch `data_stream.type` to `metrics`). The manifest-level `type` determines index template selection and data stream routing at the Fleet/Agent level before processors run, so metrics data will be routed to `logs-*` data streams instead of `metrics-*`, undermining the PR's goal of enabling custom metric ingestion. Change `type: logs` to `type: metrics` so the data stream is correctly routed to `metrics-*-*` indices.

@adrianchen-es adrianchen-es changed the title [kafka log] Initial commit to enable custom metric ingestion for Kafka [kafka] Initial commit to enable metric ingestion via Kafka input Apr 13, 2026
@adrianchen-es adrianchen-es removed the Integration:kafka_log Custom Kafka Logs label Apr 13, 2026
Standalone input package (WMI-style layout) for the Filebeat Kafka input,
with system tests, Docker fixtures, and field mappings for Kafka metadata.

Made-with: Cursor
@adrianchen-es adrianchen-es force-pushed the 18264-ac-kafka-multi_signal branch from b99b9a7 to 1d79f04 Compare April 13, 2026 11:17
@adrianchen-es adrianchen-es requested a review from a team as a code owner April 13, 2026 11:17
Comment on lines +105 to +108
{{#if ssl}}
ssl:
{{ssl}}
{{/if}}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium input/input.yml.hbs:105

The ssl field template at line 107 ( {{ssl}}) only indents the first line of multi-line YAML content. When ssl contains the default multi-line mapping from manifest.yml, the rendered output has enabled nested under ssl but certificate and key become top-level keys instead of nested. This produces malformed YAML that the Kafka input fails to parse correctly.

+{{#if ssl}}
+ssl: {{ssl}}
+{{/if}}
🤖 Copy this AI Prompt to have your agent fix this:
In file packages/kafka_input/agent/input/input.yml.hbs around lines 105-108:

The `ssl` field template at line 107 (`  {{ssl}}`) only indents the first line of multi-line YAML content. When `ssl` contains the default multi-line mapping from manifest.yml, the rendered output has `enabled` nested under `ssl` but `certificate` and `key` become top-level keys instead of nested. This produces malformed YAML that the Kafka input fails to parse correctly.

@andrewkroh andrewkroh added the New Integration Issue or pull request for creating a new integration package. label Apr 13, 2026
@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@stefans-elastic , not to waste your time.

Is it currently impossible to have metrics as the data_stream.type?
I've attempted to mimic how the WMI input works, and the UI appears to have a selector for the data_stream.type.
But it seems that it ends up as logs still.

Comment on lines +105 to +107
{{#if ssl}}
ssl: {{ssl}}
{{/if}}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High input/input.yml.hbs:105

Line 106 outputs ssl: {{ssl}} on a single line, but ssl is a multi-line YAML map. This produces invalid YAML like ssl: enabled: false\ncertificate: "/etc/pki/client/cert.pem" when rendered, causing the Kafka input configuration to fail parsing when SSL is configured. The ssl value needs to be nested under ssl: with proper indentation.

 {{#if ssl}}
-ssl: {{ssl}}
+ssl: 
+  {{ssl}}
 {{/if}}
🤖 Copy this AI Prompt to have your agent fix this:
In file packages/kafka_input/agent/input/input.yml.hbs around lines 105-107:

Line 106 outputs `ssl: {{ssl}}` on a single line, but `ssl` is a multi-line YAML map. This produces invalid YAML like `ssl: enabled: false\ncertificate: "/etc/pki/client/cert.pem"` when rendered, causing the Kafka input configuration to fail parsing when SSL is configured. The `ssl` value needs to be nested under `ssl:` with proper indentation.

@elasticmachine
Copy link
Copy Markdown

💚 Build Succeeded

History

cc @adrianchen-es

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es
it seems to me that kafka_input package is pretty much the same as kafka_log so is there a reason to introduce it?

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@adrianchen-es it seems to me that kafka_input package is pretty much the same as kafka_log so is there a reason to introduce it?

Hey @stefans-elastic, could you help with the question from here?

@stefans-elastic , not to waste your time.

Is it currently impossible to have metrics as the data_stream.type? I've attempted to mimic how the WMI input works, and the UI appears to have a selector for the data_stream.type. But it seems that it ends up as logs still.

The aim is to have a method to ingest metrics from the Kafka input. If that is hard-coded and not possible. I will close this (or pause it and look at the package-spec/filebeat).

@stefans-elastic
Copy link
Copy Markdown
Contributor

@adrianchen-es
Unfortunately I don't have a straight answer to this. But given that kafka input is filebeat's feature (not metricbeat's) it is possible that the answer is no. But I'm not sure on this so 'second opinion' is needed here (maybe @agithomas you can help here?)

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

@adrianchen-es Unfortunately I don't have a straight answer to this. But given that kafka input is filebeat's feature (not metricbeat's) it is possible that the answer is no. But I'm not sure on this so 'second opinion' is needed here (maybe @agithomas you can help here?)

@agithomas any chance you could help us clarify this?

@agithomas
Copy link
Copy Markdown
Contributor

@adrianchen-es
Copy link
Copy Markdown
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation. Applied to PRs that modify *.md files. enhancement New feature or request >enhancement New Integration Issue or pull request for creating a new integration package. Team:Obs-InfraObs Observability Infrastructure Monitoring team [elastic/obs-infraobs-integrations]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Kafka Log]: Configurable data_stream type

5 participants