Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
🚧 Files skipped from review as they are similar to previous changes (5)
📝 WalkthroughWalkthroughAdds configurable OTLP log partitioning by resource for the Kafka output plugin: new config flag, partition data structures, msgpack/hash helpers, keyed raw producer, partitioned production flow wired into OTLP JSON/proto flush paths, config registration, and integration tests/configs. ChangesOTLP Log Partitioning by Resource
Sequence Diagram(s)sequenceDiagram
participant FlushCaller
participant Decoder
participant PartitionMap
participant Converter
participant KafkaProducer
FlushCaller->>Decoder: decode OTLP logs chunk
Decoder->>PartitionMap: compute resource hash, assign record
PartitionMap->>PartitionMap: buffer record per resource
PartitionMap->>Converter: convert each partition buffer to OTLP JSON/proto
Converter->>KafkaProducer: produce_raw_payload_with_key(payload, key)
KafkaProducer-->>FlushCaller: produce result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b5ae018aa4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| ret = produce_raw_payload_with_key(payload, | ||
| flb_sds_len(payload), | ||
| key, | ||
| key_len, | ||
| ctx); |
There was a problem hiding this comment.
Avoid retrying after partially producing partitions
When otlp_logs_partition_by_resource creates multiple resource partitions, this loop produces them one by one and immediately returns FLB_RETRY/FLB_ERROR if a later rd_kafka_produce fails (for example, the local Kafka queue becomes full). Any earlier partitions have already been enqueued, but the engine will retry the original chunk as a whole, so those resource payloads are sent again and consumers receive duplicates whenever only a suffix of the partition list fails.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 875-908: The local variable scope_id in get_otlp_group_resource is
assigned but never used, causing -Werror with -Wunused-variable; remove the
unused variable and call msgpack_map_get_int64 without capturing the value (pass
NULL if the API supports it) or, alternatively, keep the variable but explicitly
mark it used (e.g., (void)scope_id) after the call; update the call site where
msgpack_map_get_int64(&group_metadata->via.map, "scope_id", &scope_id) to either
msgpack_map_get_int64(&group_metadata->via.map, "scope_id", NULL) or keep
&scope_id and add (void)scope_id to suppress the unused-variable warning in
get_otlp_group_resource.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5137eee9-650d-43f2-8ff9-6ab599b64c29
📒 Files selected for processing (5)
plugins/out_kafka/kafka.cplugins/out_kafka/kafka_config.htests/integration/scenarios/out_kafka/config/out_kafka_otlp_json_partition_by_resource.yamltests/integration/scenarios/out_kafka/config/out_kafka_otlp_proto_partition_by_resource.yamltests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/out_kafka/kafka.c (1)
557-579:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winBound the queue-full loop even when engine retry is disabled.
After the first partition is accepted,
allow_engine_retrybecomes false, so Lines 557-559 never fire and theQUEUE_FULLpath loops forever. If Kafka stays backed up after a partial send, this flush can block indefinitely instead of failing fast without duplicating already-enqueued partitions.Suggested fix
retry: - if (allow_engine_retry == FLB_TRUE && - ctx->queue_full_retries > 0 && - queue_full_retries >= ctx->queue_full_retries) { + if (ctx->queue_full_retries > 0 && + queue_full_retries >= ctx->queue_full_retries) { ctx->blocked = FLB_FALSE; - return FLB_RETRY; + return allow_engine_retry == FLB_TRUE ? FLB_RETRY : FLB_ERROR; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 557 - 579, The queue-full retry loop can spin forever when allow_engine_retry becomes false after a partial send; modify the QUEUE_FULL handling in the rd_kafka_produce branch to always check and increment queue_full_retries and compare against ctx->queue_full_retries (or a safe max) before retrying, regardless of allow_engine_retry, and if the max is reached set ctx->blocked = FLB_FALSE and return FLB_RETRY (or fail fast) instead of looping; update the logic around rd_kafka_produce, RD_KAFKA_RESP_ERR__QUEUE_FULL, queue_full_retries and ctx->blocked so the loop is bounded even when allow_engine_retry == FLB_FALSE.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 541-548: The code currently treats key == NULL as equivalent to
ctx->message_key, which causes partitions that should be unkeyed to inherit the
static key; change the logic so that a NULL key is left NULL for partitions that
cannot derive a resource hash: keep the existing wrapper behavior for the
default-key path, but when invoking the partitioned-send path ensure you pass
use_default_key = FLB_FALSE whenever partition->has_key != FLB_TRUE (so key
remains NULL and message_key/message_key_len are not overwritten by
ctx->message_key); update the branch that sets message_key/message_key_len (and
the call site that selects between wrapper vs partitioned path) to honor
use_default_key and partition->has_key accordingly.
---
Outside diff comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 557-579: The queue-full retry loop can spin forever when
allow_engine_retry becomes false after a partial send; modify the QUEUE_FULL
handling in the rd_kafka_produce branch to always check and increment
queue_full_retries and compare against ctx->queue_full_retries (or a safe max)
before retrying, regardless of allow_engine_retry, and if the max is reached set
ctx->blocked = FLB_FALSE and return FLB_RETRY (or fail fast) instead of looping;
update the logic around rd_kafka_produce, RD_KAFKA_RESP_ERR__QUEUE_FULL,
queue_full_retries and ctx->blocked so the loop is bounded even when
allow_engine_retry == FLB_FALSE.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
plugins/out_kafka/kafka.c (3)
1427-1432: ⚡ Quick winConfig entry — consider tightening the description.
The description is clear, but it does not mention the behavioral side-effect that
message_key/message_key_fieldare effectively superseded for OTLP log chunks when this flag is enabled (each partition is keyed by the resource-attributes hash, or unkeyed when no resource info is available). A single sentence here will save a lot of support questions.📝 Suggested wording tweak
"When using format otlp_json or otlp_proto, split OTLP log payloads by " - "resource and use a hash of the resource attributes as the Kafka message key." + "resource and use a hash of the resource attributes as the Kafka message key. " + "When enabled, this overrides 'message_key' for OTLP log chunks; partitions " + "without a derivable resource are produced without a key." },🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 1427 - 1432, Update the config description for otlp_logs_partition_by_resource in struct flb_out_kafka to explicitly state the behavioral side-effect: when enabled (for formats otlp_json/otlp_proto) OTLP log chunks are partitioned by resource and the Kafka message key is derived from a hash of the resource attributes — this effectively supersedes message_key and message_key_field for those OTLP log chunks, and if no resource information exists the messages will be unkeyed. Keep the sentence concise and add it to the existing description string.
974-1088: ⚡ Quick winConsider hoisting
default_logs_body_keysand the option setup.
default_logs_body_keysand theflb_opentelemetry_otlp_logs_optionsinitialization block are now duplicated acrossproduce_partitioned_otlp_logs(974, 1084-1088),produce_otlp_json(1165, 1176-1180), andproduce_otlp_proto(1227, 1236-1240). A small file-scope helper (or a singlestatic constarray + ainit_otlp_logs_options(...)helper) would eliminate the drift risk if the defaults ever evolve.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 974 - 1088, Hoist the duplicated default_logs_body_keys and options initialization into a single shared helper: create a file-scope static const char *default_logs_body_keys[] and an init helper (e.g., init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *opts)) that sets logs_require_otel_metadata, logs_body_keys, logs_body_key_count and logs_body_key_attributes; then replace the duplicate blocks in produce_partitioned_otlp_logs, produce_otlp_json, and produce_otlp_proto to call init_otlp_logs_options(&options) and remove their local duplicates so all three functions use the same centralized defaults.
558-560: 💤 Low valueConsider
<= 0for the unlimited-retries guard.
queue_full_retriesis documented as "0 or false = no limit". If a user passes a negative value (allowed byFLB_CONFIG_MAP_INT), the partitioned path will not apply theFLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIESceiling because the comparison is== 0, leaving the partitioned producer to spin without a fallback bound. Tightening to<= 0mirrors the documented "no limit" semantics and makes the partial-retry safety net unconditional.♻️ Suggested change
- if (queue_full_retry_limit == 0 && allow_engine_retry == FLB_FALSE) { + if (queue_full_retry_limit <= 0 && allow_engine_retry == FLB_FALSE) { queue_full_retry_limit = FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/out_kafka/kafka.c` around lines 558 - 560, The guard that sets queue_full_retry_limit only when queue_full_retry_limit == 0 should be tightened to handle negative values too: change the conditional that checks queue_full_retry_limit (together with allow_engine_retry) so it uses <= 0 instead of == 0, ensuring the FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling is applied when queue_full_retry_limit is unset or negative; update the condition referencing queue_full_retry_limit, allow_engine_retry and FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 1090-1148: Summarize and document the at-most-once durability
trade-off introduced by otlp_logs_partition_by_resource: explain that when
partitioning OTLP logs by resource the send loop (see
produce_raw_payload_with_key_retry_control usage) intentionally returns
FLB_ERROR (not FLB_RETRY) for failures after one or more partitions have already
been produced, to avoid duplicating already-enqueued partitions on engine
replay; update the user-facing docs/feature doc to clearly state that enabling
otlp_logs_partition_by_resource may result in partial delivery under broker
back-pressure (reduced durability compared to unpartitioned mode), describe the
observable behavior and suggested mitigations (e.g., disable partitioning or
configure topic/producer to reduce back-pressure), and reference the
partitioned-produce behavior so users can make an informed choice.
---
Nitpick comments:
In `@plugins/out_kafka/kafka.c`:
- Around line 1427-1432: Update the config description for
otlp_logs_partition_by_resource in struct flb_out_kafka to explicitly state the
behavioral side-effect: when enabled (for formats otlp_json/otlp_proto) OTLP log
chunks are partitioned by resource and the Kafka message key is derived from a
hash of the resource attributes — this effectively supersedes message_key and
message_key_field for those OTLP log chunks, and if no resource information
exists the messages will be unkeyed. Keep the sentence concise and add it to the
existing description string.
- Around line 974-1088: Hoist the duplicated default_logs_body_keys and options
initialization into a single shared helper: create a file-scope static const
char *default_logs_body_keys[] and an init helper (e.g.,
init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *opts)) that
sets logs_require_otel_metadata, logs_body_keys, logs_body_key_count and
logs_body_key_attributes; then replace the duplicate blocks in
produce_partitioned_otlp_logs, produce_otlp_json, and produce_otlp_proto to call
init_otlp_logs_options(&options) and remove their local duplicates so all three
functions use the same centralized defaults.
- Around line 558-560: The guard that sets queue_full_retry_limit only when
queue_full_retry_limit == 0 should be tightened to handle negative values too:
change the conditional that checks queue_full_retry_limit (together with
allow_engine_retry) so it uses <= 0 instead of == 0, ensuring the
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES ceiling is applied when
queue_full_retry_limit is unset or negative; update the condition referencing
queue_full_retry_limit, allow_engine_retry and
FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
223f713 to
42bf79a
Compare
This PR introduces a new config option to Kafka output to split OTLP log records by resource:
otlp_logs_partition_by_resourcetoout_kafkaforotlp_jsonandotlp_protolog output.Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests
Configuration