enhancement(kubernetes_logs source): add end-to-end acknowledgement support#38
Open
enhancement(kubernetes_logs source): add end-to-end acknowledgement support#38
Conversation
…upport Wire acknowledgements through the kubernetes_logs source so file checkpoints only advance after downstream sinks confirm delivery. Adds SourceAcknowledgementsConfig, FinalizerEntry, and OrderedFinalizer integration. Includes mock-based tests for the ack flow.
B1: Add Source::new_test() that accepts a pre-built Client and custom
logs directory, bypassing kubeconfig/env-var resolution. Add
logs_dir_override to K8sPathsProvider and path_helpers so tests
can glob a tempdir instead of /var/log/pods.
S1: Remove dead AckingMode::Unfinalized variant and its handling code.
S3/S5: Remove SourceConfigTest trait entirely. The test now calls
Source::new_test() directly, which is simpler and avoids
duplicating SourceConfig's doc comments and method signatures.
S4: Add checkpoint_does_not_advance_without_ack test that verifies
checkpoints do NOT advance when events are rejected (not acked).
N1: Combine super::super imports into a single use statement.
N2: Remove redundant #[cfg(any(test, feature = "all-integration-tests"))]
inside the already-#[cfg(test)] module.
N3: Add Clone, Copy derives to FinalizerEntry.
Add required changelog fragment for kubernetes_logs acknowledgement feature. Fix end-to-end acknowledgements URL in CUE documentation to match the pattern used by all other source docs.
47a6af3 to
0a9ebfb
Compare
…otdev#25259) * chore(ci): remove dead nightly artifact-redirect loop The loop referenced an undefined $i (copy-paste from the release branch where $i iterates over version tags). Under set -u the subshell errored, the for loop received empty input, and the body never ran — so this has been a no-op since it was added. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(ci): retry verify_artifact on CDN cache staleness packages.timber.io is fronted by a CDN; after `aws s3 rm --recursive` + `cp --recursive` on nightly/latest, the edge can keep serving stale bytes for longer than the existing 30s VERIFY_TIMEOUT, failing `cmp` and the whole job. wget's --retry-on-http-error=404 only retries 404s, not a 200 with stale content. Wrap the compare in an exponential-backoff retry loop (1, 2, 4, 8, 16, 32s; 7 attempts, ~63s of total sleep) so a transient stale-cache hit no longer fails the release. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Fetch and fast-forward the release branch before inspecting HEAD so `git show --stat HEAD` reflects origin, not a stale local tip. - Use `git push --force-with-lease` when resetting the `website` branch to the release branch's HEAD; a plain `git push` is rejected as non-fast-forward, which is the expected outcome of `reset --hard` to a different branch. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…g errors (vectordotdev#25199) * fix(codecs): centralize events_dropped emission for batch encoding errors Move events_dropped emission from individual internal events inside serializers to a single wrapper in (Transformer, BatchEncoder)::encode_input. This ensures all batch encoding error paths (Arrow IPC and Parquet) consistently emit events_dropped without requiring each new error path to remember to add it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: fix formatting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(codecs): add unit test for type mismatch in Parquet encoding Covers the build_record_batch ArrowJsonDecode error path where a schema expects int64 but the event contains a string value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(codecs): emit ComponentEventsDropped directly to avoid double-counting Replace EncoderWriteError with a direct ComponentEventsDropped emission in the batch encode wrapper. EncoderWriteError was incrementing component_errors_total and logging "Failed writing bytes." which double-counted errors (codec-specific events already increment component_errors_total) and was misleading (the failure was encoding, not writing). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: clarify comment about error counting and overcount edge case Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(codecs): guard batch-drop imports behind codecs-arrow feature Move ComponentEventsDropped and UNINTENTIONAL imports inside the cfg(feature = "codecs-arrow") impl block to avoid unused import errors when the feature is disabled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: clarify overcount edge case is a misconfiguration scenario Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore(codecs): emit codec event for build_record_batch failures The new EncoderRecordBatchError fires from build_record_batch's RecordBatchCreation and ArrowJsonDecode paths, so type-mismatch and decoder-build failures emit a granular component_errors_total counter at stage="sending" with a specific error_code, instead of relying solely on the downstream SinkRequestBuildError at stage="processing". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(codecs): add changelog fragment for batch encoding event coverage Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(codecs): assert EncoderRecordBatchError fires on Arrow type mismatch Drives (Transformer, BatchEncoder)::encode_input through ArrowStreamSerializer with an Int64 schema field and a string-valued event to trigger the ArrowJsonDecode path in build_record_batch. Asserts both EncoderRecordBatchError and ComponentEventsDropped are recorded so a regression in either emission fails the test. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ctordotdev#25321) * fix(releasing): enable codecs-parquet in all release feature sets The aws_s3 sink's `batch_encoding` field is gated behind the `codecs-parquet` Cargo feature, but the v0.55.0 release artifacts did not include that feature. Users running the precompiled binaries hit `unknown field batch_encoding` even though the feature was advertised in the v0.55.0 release notes. Enable `codecs-parquet` in `target-base` (covers every Linux release triple), `default` (macOS), and `default-msvc` (Windows), plus the related aggregator features for consistency. Closes vectordotdev#25313 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(releasing): correct changelog author handle Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(releasing): trim changelog wording Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(external docs): regenerate component docs with parquet codec Adds the missing `docs::enum_tag_description` metadata on `ParquetCompression` (required by the doc generator for internally tagged enums) and regenerates the aws_s3 and clickhouse component Cue files now that `codecs-parquet` and `codecs-arrow` are part of the default release feature set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: address review wording on batch encoding fields Apply @maycmlee's suggestions in the source rustdocs (and regenerate the affected Cue): - aws_s3 `batch_encoding`: `e.g., Parquet` -> `for example, Parquet` - arrow `allow_nullable_fields`: tighter wording for both enabled and disabled cases - parquet `Zstd`/`Gzip` `level`: drop "currently" from "Vector currently supports" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(releasing): consolidate codecs-parquet into base feature Every release feature aggregator (default, default-cmake, default-msvc, default-musl, default-no-api-client, target-base) transitively pulls `base` either directly or via `enable-api-client`/`enable-unix`. Drop the redundant per-aggregator listings and add `codecs-parquet` to `base` once so the feature can't be silently lost when a future aggregator gets added. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(aws_s3, clickhouse): note supported batch_encoding codec per sink The `batch_encoding` field on both sinks types as `Option<BatchSerializerConfig>`, which advertises every codec variant in the generated reference. Each sink only accepts a subset at runtime: aws_s3 rejects everything except `parquet`, clickhouse rejects `parquet`. Call out the supported codec in the field's rustdoc so the published reference matches what the sink will actually accept. A follow-up should replace the shared enum with sink-specific config types so the type system enforces the constraint. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When auto_partial_merge is enabled (the default), partial log lines are merged into a single event. Previously, each raw line received its own BatchNotifier before the merge step. When the merger dropped fragment events after extracting their bytes, those fragments' finalizers fired as Delivered, advancing the checkpoint past unacknowledged data. If the merged event was later rejected by a sink, earlier fragments had already been checkpointed. Fix: transfer finalizers from each fragment to the bucket event during merge, so the merged event carries all fragments' finalizers. The checkpoint only advances after the complete merged event is delivered.
…pstream-k8s-logs-acks
… into connor/upstream-k8s-logs-acks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds end-to-end acknowledgement support to the
kubernetes_logssource. When acknowledgements are enabled (via a downstream sink), file checkpoints only advance after downstream sinks confirm event delivery. This prevents data loss when the source crashes or restarts — unacknowledged events are re-read from the checkpoint position.Based on initial work by @ganelo (Orri), cleaned up and rebased onto current master.
Motivation
The
kubernetes_logssource currently returnscan_acknowledge() -> false, meaning it cannot participate in Vector's end-to-end acknowledgement system. When a downstream sink fails or the source crashes, events between the last checkpoint and the crash point are lost — the checkpoint was advanced before delivery was confirmed.The
filesource already supports acknowledgements using the sameOrderedFinalizer+BatchNotifierpattern. This PR brings the same capability tokubernetes_logs, which shares the underlyingfile_sourceinfrastructure.Approach
The implementation mirrors the
filesource's acknowledgement pattern:can_acknowledge()returnstrue— enables the topology's ack propagationOrderedFinalizer<FinalizerEntry>— receives ack status from downstream sinks in orderBatchNotifierper batch — attached to events before emitting; downstream sinks update status on deliverycheckpoints.update()only called whenBatchStatus::Deliveredis receivedKey design decisions:
FinalizerEntryis defined locally (not imported fromfilesource) sincesources-kubernetes_logsdoesn't depend onsources-fileSource::new_test()method added for mock-based testing with a pre-built Kubernetes clientacknowledgementsconfig field uses the standardSourceAcknowledgementsConfig+bool_or_structpatternVector configuration
No source-level configuration is needed. Acknowledgements activate automatically when a downstream sink has
acknowledgements.enabled = true, viapropagate_acknowledgements().How did you test this PR?
3 new tests + 51 existing tests pass:
file_start_position_server_restart_with_file_rotation_no_acknowledgefile_start_position_server_restart_with_file_rotation_acknowledgedcheckpoint_does_not_advance_without_ackTests use a mock Kubernetes client (
Source::new_test()) with a configurable logs directory, following the same pattern as thefilesource tests.All 51 existing
kubernetes_logstests pass unchanged.Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References