out_opentelemetry: Add resource attributes support#11574
out_opentelemetry: Add resource attributes support#11574cb645j wants to merge 1 commit intofluent:masterfrom
Conversation
Signed-off-by: BOSLET, CORY <cb645j+ATT@att.com>
📝 WalkthroughWalkthroughAdds configuration and processing for extracting resource attributes from OpenTelemetry log message bodies. Introduces a new configuration option specifying which message keys should be promoted to OTLP resource attributes, with supporting data structure fields, validation logic during initialization, and runtime extraction during log processing. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3f51519d7c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if (native_otel == FLB_FALSE && | ||
| ctx->ra_resource_attributes_message && | ||
| mk_list_size(ctx->ra_resource_attributes_message) > 0) { | ||
| resource_id = -1; | ||
| scope_id = -1; |
There was a problem hiding this comment.
Keep resource array growth on per-record reset path
When logs_resource_attributes_message_key is enabled for non-native OTLP input, this new block resets resource_id/scope_id on every event, which forces every standalone record through the goto start_resource path in otel_process_logs. That label sits after the resource-capacity/max-resources checks, so repeated records append to resource_logs[export_logs.n_resource_logs] without reallocation; once a chunk has more than the initial 256 resources, this can write past the allocated array and corrupt memory.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_opentelemetry/opentelemetry_logs.c`:
- Around line 784-939: The function set_resource_attributes_from_message_body is
being called during resource creation when handling FLB_LOG_EVENT_GROUP_START,
but at that time event.body is the group descriptor (not per-record bodies), so
resource attributes derived from message keys miss grouped records; move or
duplicate the message-body key lookup into the per-record handling path (the
code that processes FLB_LOG_EVENT_NORMAL records) so you call
set_resource_attributes_from_message_body (or its core lookup logic) using the
actual per-record msgpack body before resource selection/assignment; update
callers (remove or guard the call during GROUP_START) and ensure the lookup uses
ctx->ra_resource_attributes_message and the per-record msgpack_object body so
grouped OTLP records get their resource attributes populated.
- Around line 1182-1193: The current code forces resource_id and scope_id to -1
for every non-OTLP record when ctx->ra_resource_attributes_message is set, which
breaks batching by resetting log_record_count per record; change the logic in
the block that manipulates resource_id/scope_id so it does NOT unconditionally
reset per record—either (a) only reset resource_id/scope_id when the promoted
resource attributes actually change compared to the last record (store and
compare the last promoted resource signature), or (b) respect an export-wide
flush boundary by tracking an export counter and only forcing a new
resource/scope when that counter reaches ctx->batch_size (or when attributes
change); update uses of resource_id, scope_id, and log_record_count to follow
this rule so ctx->batch_size and the flush threshold can be honored while still
isolating records with different promoted attributes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6d69adc9-f346-4069-b872-48ff2fc8dd3a
📒 Files selected for processing (4)
plugins/out_opentelemetry/opentelemetry.cplugins/out_opentelemetry/opentelemetry.hplugins/out_opentelemetry/opentelemetry_conf.cplugins/out_opentelemetry/opentelemetry_logs.c
| /* | ||
| * For each key name in ctx->ra_resource_attributes_message_list, look it up in | ||
| * the msgpack message body and promote the value to an OTLP resource attribute. | ||
| */ | ||
| static void set_resource_attributes_from_message_body( | ||
| struct opentelemetry_context *ctx, | ||
| msgpack_object *body, | ||
| Opentelemetry__Proto__Resource__V1__Resource *resource) | ||
| { | ||
| int i; | ||
| size_t key_len; | ||
| size_t map_key_len; | ||
| char *map_key_ptr; | ||
| struct mk_list *head; | ||
| struct flb_config_map_val *mv; | ||
| struct flb_slist_entry *entry; | ||
| const char *normalized_key; | ||
| msgpack_object_kv *kv; | ||
| Opentelemetry__Proto__Common__V1__KeyValue *attr; | ||
| Opentelemetry__Proto__Common__V1__KeyValue **tmp_attrs; | ||
|
|
||
| /* | ||
| * Use ctx->ra_resource_attributes_message directly — this is the pointer | ||
| * managed by the Fluent Bit config-map framework and is reliably available | ||
| * in every worker thread context, unlike an embedded mk_list copy. | ||
| */ | ||
| if (!ctx->ra_resource_attributes_message || | ||
| mk_list_size(ctx->ra_resource_attributes_message) == 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (body == NULL || body->type != MSGPACK_OBJECT_MAP) { | ||
| return; | ||
| } | ||
|
|
||
| /* Iterate directly over the config-map-managed list */ | ||
| flb_config_map_foreach(head, mv, ctx->ra_resource_attributes_message) { | ||
| if (mk_list_size(mv->val.list) != 1) { | ||
| continue; | ||
| } | ||
|
|
||
| entry = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); | ||
| normalized_key = entry->str; | ||
| key_len = flb_sds_len(entry->str); | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* | ||
| * Allow optional record accessor prefix so both "service.name" and | ||
| * "$service.name" are treated as the same map key. | ||
| */ | ||
| if (key_len > 0 && normalized_key[0] == '$') { | ||
| normalized_key++; | ||
| key_len--; | ||
| } | ||
|
|
||
| /* | ||
| * Also tolerate bracket forms like $['service.name'] and | ||
| * $["service.name"] for literal keys. | ||
| */ | ||
| if (key_len >= 4 && normalized_key[0] == '[') { | ||
| if ((normalized_key[1] == '\'' && normalized_key[key_len - 2] == '\'' && | ||
| normalized_key[key_len - 1] == ']') || | ||
| (normalized_key[1] == '"' && normalized_key[key_len - 2] == '"' && | ||
| normalized_key[key_len - 1] == ']')) { | ||
| normalized_key += 2; | ||
| key_len -= 4; | ||
| } | ||
| } | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* tolerate quoted key names like "service.name" or 'service.name' */ | ||
| if (key_len >= 2) { | ||
| if ((normalized_key[0] == '"' && normalized_key[key_len - 1] == '"') || | ||
| (normalized_key[0] == '\'' && normalized_key[key_len - 1] == '\'')) { | ||
| normalized_key++; | ||
| key_len -= 2; | ||
| } | ||
| } | ||
|
|
||
| if (key_len == 0) { | ||
| continue; | ||
| } | ||
|
|
||
| for (i = 0; i < body->via.map.size; i++) { | ||
| kv = &body->via.map.ptr[i]; | ||
|
|
||
| if (kv->key.type == MSGPACK_OBJECT_STR) { | ||
| map_key_ptr = kv->key.via.str.ptr; | ||
| map_key_len = kv->key.via.str.size; | ||
| } | ||
| else if (kv->key.type == MSGPACK_OBJECT_BIN) { | ||
| map_key_ptr = (char *) kv->key.via.bin.ptr; | ||
| map_key_len = kv->key.via.bin.size; | ||
| } | ||
| else { | ||
| continue; | ||
| } | ||
|
|
||
| if (map_key_len != key_len) { | ||
| continue; | ||
| } | ||
|
|
||
| if (strncmp(map_key_ptr, normalized_key, key_len) != 0) { | ||
| continue; | ||
| } | ||
|
|
||
| /* Found the key — convert to OTLP KeyValue */ | ||
| if (kv->key.type == MSGPACK_OBJECT_STR) { | ||
| attr = msgpack_kv_to_otlp_any_value(kv); | ||
| } | ||
| else { | ||
| attr = otlp_kvpair_value_initialize(); | ||
| if (attr != NULL) { | ||
| attr->key = flb_strndup(map_key_ptr, map_key_len); | ||
|
|
||
| if (attr->key != NULL) { | ||
| attr->value = msgpack_object_to_otlp_any_value(&kv->val); | ||
| } | ||
|
|
||
| if (attr->key == NULL || attr->value == NULL) { | ||
| otlp_kvpair_destroy(attr); | ||
| attr = NULL; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!attr) { | ||
| flb_plg_warn(ctx->ins, "resource attributes: failed to convert key '%s' to OTLP KeyValue", | ||
| entry->str); | ||
| break; | ||
| } | ||
|
|
||
| /* Grow the resource attributes array by one slot */ | ||
| tmp_attrs = flb_realloc(resource->attributes, | ||
| (resource->n_attributes + 1) * | ||
| sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); | ||
| if (!tmp_attrs) { | ||
| flb_plg_error(ctx->ins, "resource attributes: memory allocation failed for key '%s'", | ||
| entry->str); | ||
| otlp_kvpair_destroy(attr); | ||
| break; | ||
| } | ||
|
|
||
| resource->attributes = tmp_attrs; | ||
| resource->attributes[resource->n_attributes] = attr; | ||
| resource->n_attributes++; | ||
| break; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Grouped OTLP records never read the actual log body here.
This helper is wired into the resource-creation path, so for native OTLP input it runs while handling FLB_LOG_EVENT_GROUP_START. At that point event.body is the group descriptor ($resource / $scope), not the FLB_LOG_EVENT_NORMAL body, so logs_resource_attributes_message_key only works for standalone logs. If grouped OTLP records are meant to be supported too, the lookup needs to happen in the per-record path before resource selection.
Also applies to: 1304-1305
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@plugins/out_opentelemetry/opentelemetry_logs.c` around lines 784 - 939, The
function set_resource_attributes_from_message_body is being called during
resource creation when handling FLB_LOG_EVENT_GROUP_START, but at that time
event.body is the group descriptor (not per-record bodies), so resource
attributes derived from message keys miss grouped records; move or duplicate the
message-body key lookup into the per-record handling path (the code that
processes FLB_LOG_EVENT_NORMAL records) so you call
set_resource_attributes_from_message_body (or its core lookup logic) using the
actual per-record msgpack body before resource selection/assignment; update
callers (remove or guard the call during GROUP_START) and ensure the lookup uses
ctx->ra_resource_attributes_message and the per-record msgpack_object body so
grouped OTLP records get their resource attributes populated.
| /* | ||
| * For standalone records (non-native OTLP groups), resource attributes | ||
| * promoted from message keys are record-specific. Force a fresh | ||
| * resource/scope context per record when this feature is enabled to | ||
| * avoid carrying stale values across subsequent log lines. | ||
| */ | ||
| if (native_otel == FLB_FALSE && | ||
| ctx->ra_resource_attributes_message && | ||
| mk_list_size(ctx->ra_resource_attributes_message) > 0) { | ||
| resource_id = -1; | ||
| scope_id = -1; | ||
| } |
There was a problem hiding this comment.
This turns batch_size into a no-op for standalone logs.
With logs_resource_attributes_message_key configured, every non-OTLP record forces resource_id and scope_id back to -1, so a new resource/scope is created and log_record_count is reset on every record. That makes the flush threshold at Line 1507 unreachable for the common batch_size > 1 case, while each forced scope still allocates a ctx->batch_size-slot log_records buffer at Line 1398. Please add an export-wide flush boundary for this mode, or otherwise cap the per-record resource split.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@plugins/out_opentelemetry/opentelemetry_logs.c` around lines 1182 - 1193, The
current code forces resource_id and scope_id to -1 for every non-OTLP record
when ctx->ra_resource_attributes_message is set, which breaks batching by
resetting log_record_count per record; change the logic in the block that
manipulates resource_id/scope_id so it does NOT unconditionally reset per
record—either (a) only reset resource_id/scope_id when the promoted resource
attributes actually change compared to the last record (store and compare the
last promoted resource signature), or (b) respect an export-wide flush boundary
by tracking an export counter and only forcing a new resource/scope when that
counter reaches ctx->batch_size (or when attributes change); update uses of
resource_id, scope_id, and log_record_count to follow this rule so
ctx->batch_size and the flush threshold can be honored while still isolating
records with different promoted attributes.
|
Link to docs PR link fluent/fluent-bit-docs#2405 |
|
@cosmo0920 the fails here are unrelated to my changes. Can you please review |
This is related to issue #11491
Link to docs PR link fluent/fluent-bit-docs#2405
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Example Configuration
Debug log ouput
[2026/03/02 15:30:38.171218000] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
{"date":1772483437.169789,"time":"2026-03-02T15:35:24.315873000Z","severity":"debug","severity_number":"7","msg":"Dev pr-11510 test2","subsys":"identity-cache-cell","deployment.environment.name":"dev","service.name":"my-app-log-svc5","application":"TESTAPP-19999"}
Documentation
Otel Log Data Model fields now supported:
Valgrind
coming soon