out_gcs: Implement Google Cloud Storage(gcs) plugin#11792
out_gcs: Implement Google Cloud Storage(gcs) plugin#11792
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdds a new out_gcs output plugin and build option, implements on-disk buffering, OAuth/JWT authentication, upload queue and HTTP upload logic (gzip/MD5/ACL/options), integrates plugin into build and tests, and provides runtime tests for success and error/retry paths. ChangesGCS Output Plugin
Sequence DiagramsequenceDiagram
participant Flb as FluentBit
participant Plugin as out_gcs
participant Store as gcs_store
participant Queue as UploadQueue
participant OAuth as OAuth2
participant GCS as GoogleCloudStorage
Flb->>Plugin: flush(msgpack)
Plugin->>Store: write chunk (JSON)
Store-->>Plugin: gcs_file handle
Plugin->>Queue: enqueue(gcs_file)
Queue->>Store: read chunk content
Store-->>Queue: data buffer
Queue->>OAuth: get_token()
OAuth-->>Queue: access_token
Queue->>Plugin: build request (headers + payload)
Plugin->>GCS: POST /upload (Authorization: Bearer)
GCS-->>Plugin: response (200 / error)
Plugin->>Store: delete chunk (on success)
Plugin->>Queue: remove entry / schedule retry (on failure)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ffe6fb0502
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (2)
tests/runtime/out_gcs.c (2)
65-67: ⚡ Quick winClean up the temporary store directory after the test.
The test creates a temporary directory with
mkdtemp()but never removes it, leaving files in/tmp. Add cleanup code before returning.🧹 Proposed cleanup
After
flb_destroy(ctx)on line 98, add:/* Clean up temporary store directory */ char rm_cmd[256]; snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", store_dir); system(rm_cmd);Or use a more portable approach:
/* Clean up temporary store directory */ flb_utils_recursive_unlink(store_dir);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/runtime/out_gcs.c` around lines 65 - 67, The test creates a temp directory via store_dir and mkdtemp but never removes it; after flb_destroy(ctx) (the test teardown) add code to recursively remove store_dir — either by invoking flb_utils_recursive_unlink(store_dir) if available or by executing a safe platform call to remove the directory contents — ensuring the cleanup runs before the test returns so the /tmp directory is not left behind.
19-21: ⚡ Quick winClean up the temporary store directory after the test.
The test creates a temporary directory with
mkdtemp()but never removes it, leaving files in/tmp. Add cleanup code before returning.🧹 Proposed cleanup
After
flb_destroy(ctx)on line 51, add:/* Clean up temporary store directory */ char rm_cmd[256]; snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", store_dir); system(rm_cmd);Or use a more portable approach:
/* Clean up temporary store directory */ flb_utils_recursive_unlink(store_dir);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/runtime/out_gcs.c` around lines 19 - 21, The test creates a temporary directory (store_dir via mkdtemp) but never removes it; after flb_destroy(ctx) in the test teardown add cleanup to remove store_dir—either call flb_utils_recursive_unlink(store_dir) if available, or invoke a safe removal (e.g., build an rm -rf command with snprintf into a buffer and call system) to recursively delete the temporary directory; ensure you reference the same store_dir variable and perform the cleanup before the test returns.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/out_gcs/gcs_store.c`:
- Around line 55-68: The gcs_store_init currently creates a shared file store at
ctx->store_dir and always opens the same stream name "gcs_upload_buffer",
causing different out_gcs instances to share and drain each other's buffers;
update gcs_store_init to derive a unique stream namespace per instance (e.g.,
incorporate instance-specific fields such as ctx->name, ctx->bucket or an
internal instance id) and use that string when calling flb_fstore_stream_create
(ctx->fs_stream) so the sequence-index and buffered files live in an
instance-specific namespace; ensure the chosen identifier is stable across
restarts for that instance and still kept with ctx so cleanup and
flb_fstore_destroy behave correctly.
In `@plugins/out_gcs/gcs.c`:
- Around line 757-790: The generated key currently calls flb_get_s3_key(..., 0)
before ctx->seq_index is incremented, so $INDEX never appears in the object
name; move/perform the seq handling prior to key generation: if
ctx->key_fmt_has_seq_index, increment ctx->seq_index (or compute the next
sequence value) and pass that value as the final argument to flb_get_s3_key
instead of 0 so gcs_key/gcs_key_final includes the sequence, and keep the
existing write_seq_index(ctx->seq_index_file, ctx->seq_index) call to persist
the new value after successful use; update references to gcs_key, gcs_key_final,
ctx->seq_index, flb_get_s3_key and write_seq_index accordingly.
- Around line 56-82: The static header templates (content_type_header,
canned_acl_header, content_md5_header, storage_class_header) are mutated per
request and can race across out_gcs instances; move these into
gcs_upload_object() as local (stack) variables, initialize their
.key/.key_len/.val/.val_len per-request there, and use the local instances when
building the request instead of the global symbols; also remove or replace any
other references to the static globals so no code mutates shared header state.
- Around line 1084-1093: Check that flb_oauth2_create(...) and
flb_upstream_create(...) succeeded and fail init immediately if either returns
NULL: after assigning ctx->o = flb_oauth2_create(...) verify ctx->o != NULL (log
an error via flb_plg_error or process logger) and goto error on failure; do the
same after ctx->u = flb_upstream_create(...) to ensure ctx->u != NULL (and goto
error). Also ensure you clean up any partially initialized resources (e.g.,
destroy the mutex if token_mutex_initialized was set) when jumping to the error
path so there are no leaks.
- Around line 591-594: attach_recovered_chunk() backdates recovered entries but
add_to_queue() always overwrites entry->upload_time, causing recovered files to
be delayed; modify add_to_queue() so it only sets entry->upload_time when it is
not already initialized/backdated (e.g., if entry->upload_time == 0 or not in
the past), otherwise preserve the existing upload_time, then add the entry to
ctx->upload_queue; update the logic around upload_time in add_to_queue() (and
any callers) to ensure process_upload_queue() can pick up backdated entries
immediately.
- Around line 721-725: The code currently returns only the transport result from
flb_http_do (ret) which treats any completed HTTP response—including
401/403/5xx—as success; update the upload call path to inspect the HTTP response
status after flb_http_do (use the HTTP client/response available via the client
variable c or bytes/response fields) and treat only 2xx statuses as success
(return an error/non-zero for non-2xx so process_upload_queue() will retry),
while still calling flb_http_client_destroy(c) and
flb_upstream_conn_release(u_conn) to clean up; adjust the return value logic
around flb_http_do, the HTTP client 'c', and the response status check so that
non-2xx responses are not acknowledged as successful.
---
Nitpick comments:
In `@tests/runtime/out_gcs.c`:
- Around line 65-67: The test creates a temp directory via store_dir and mkdtemp
but never removes it; after flb_destroy(ctx) (the test teardown) add code to
recursively remove store_dir — either by invoking
flb_utils_recursive_unlink(store_dir) if available or by executing a safe
platform call to remove the directory contents — ensuring the cleanup runs
before the test returns so the /tmp directory is not left behind.
- Around line 19-21: The test creates a temporary directory (store_dir via
mkdtemp) but never removes it; after flb_destroy(ctx) in the test teardown add
cleanup to remove store_dir—either call flb_utils_recursive_unlink(store_dir) if
available, or invoke a safe removal (e.g., build an rm -rf command with snprintf
into a buffer and call system) to recursively delete the temporary directory;
ensure you reference the same store_dir variable and perform the cleanup before
the test returns.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b203c4a8-5359-459a-b781-d5c061e3be04
📒 Files selected for processing (10)
CMakeLists.txtcmake/plugins_options.cmakeplugins/CMakeLists.txtplugins/out_gcs/CMakeLists.txtplugins/out_gcs/gcs.cplugins/out_gcs/gcs.hplugins/out_gcs/gcs_store.cplugins/out_gcs/gcs_store.htests/runtime/CMakeLists.txttests/runtime/out_gcs.c
ffe6fb0 to
018a263
Compare
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
2fb74aa to
b2635b7
Compare
This PR adds Google Cloud Storage (gcs) plugin to send logs type of events into GCS buckets.
Previously, we have a PR for this purpose: #6984
But the PR seems to be stale so I had taken over that work.
The issue should be related #1032 and it's a long standing issue.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
With macOS's leaks command, there's no leaks reported:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Build
Tests