Skip to content

enhancement(kubernetes_logs source): add end-to-end acknowledgement support#38

Open
connoryy wants to merge 16 commits intomasterfrom
connor/upstream-k8s-logs-acks
Open

enhancement(kubernetes_logs source): add end-to-end acknowledgement support#38
connoryy wants to merge 16 commits intomasterfrom
connor/upstream-k8s-logs-acks

Conversation

@connoryy
Copy link
Copy Markdown
Owner

@connoryy connoryy commented Apr 28, 2026

Summary

Adds end-to-end acknowledgement support to the kubernetes_logs source. When acknowledgements are enabled (via a downstream sink), file checkpoints only advance after downstream sinks confirm event delivery. This prevents data loss when the source crashes or restarts — unacknowledged events are re-read from the checkpoint position.

Based on initial work by @ganelo (Orri), cleaned up and rebased onto current master.

Motivation

The kubernetes_logs source currently returns can_acknowledge() -> false, meaning it cannot participate in Vector's end-to-end acknowledgement system. When a downstream sink fails or the source crashes, events between the last checkpoint and the crash point are lost — the checkpoint was advanced before delivery was confirmed.

The file source already supports acknowledgements using the same OrderedFinalizer + BatchNotifier pattern. This PR brings the same capability to kubernetes_logs, which shares the underlying file_source infrastructure.

Approach

The implementation mirrors the file source's acknowledgement pattern:

  1. can_acknowledge() returns true — enables the topology's ack propagation
  2. OrderedFinalizer<FinalizerEntry> — receives ack status from downstream sinks in order
  3. BatchNotifier per batch — attached to events before emitting; downstream sinks update status on delivery
  4. Checkpoint gated on ackcheckpoints.update() only called when BatchStatus::Delivered is received
  5. Graceful shutdown — separate shutdown signal ties the finalizer stream to the checkpoint writer, ensuring all pending acks are processed before stopping

Key design decisions:

  • FinalizerEntry is defined locally (not imported from file source) since sources-kubernetes_logs doesn't depend on sources-file
  • Source::new_test() method added for mock-based testing with a pre-built Kubernetes client
  • The acknowledgements config field uses the standard SourceAcknowledgementsConfig + bool_or_struct pattern

Vector configuration

sources:
  kubernetes_logs:
    type: kubernetes_logs
    # Acknowledgements are controlled at the sink level.
    # When any downstream sink has acknowledgements enabled,
    # the source automatically participates.

No source-level configuration is needed. Acknowledgements activate automatically when a downstream sink has acknowledgements.enabled = true, via propagate_acknowledgements().

How did you test this PR?

3 new tests + 51 existing tests pass:

Test What it proves
file_start_position_server_restart_with_file_rotation_no_acknowledge Without acks: checkpoint advances immediately, no re-read after restart
file_start_position_server_restart_with_file_rotation_acknowledged With acks: checkpoint advances after ack, no duplicates after restart
checkpoint_does_not_advance_without_ack Core safety: rejected events → checkpoint does NOT advance → data re-read after restart

Tests use a mock Kubernetes client (Source::new_test()) with a configurable logs directory, following the same pattern as the file source tests.

All 51 existing kubernetes_logs tests pass unchanged.

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

…upport

Wire acknowledgements through the kubernetes_logs source so file
checkpoints only advance after downstream sinks confirm delivery.
Adds SourceAcknowledgementsConfig, FinalizerEntry, and OrderedFinalizer
integration. Includes mock-based tests for the ack flow.
B1: Add Source::new_test() that accepts a pre-built Client and custom
    logs directory, bypassing kubeconfig/env-var resolution. Add
    logs_dir_override to K8sPathsProvider and path_helpers so tests
    can glob a tempdir instead of /var/log/pods.

S1: Remove dead AckingMode::Unfinalized variant and its handling code.

S3/S5: Remove SourceConfigTest trait entirely. The test now calls
       Source::new_test() directly, which is simpler and avoids
       duplicating SourceConfig's doc comments and method signatures.

S4: Add checkpoint_does_not_advance_without_ack test that verifies
    checkpoints do NOT advance when events are rejected (not acked).

N1: Combine super::super imports into a single use statement.

N2: Remove redundant #[cfg(any(test, feature = "all-integration-tests"))]
    inside the already-#[cfg(test)] module.

N3: Add Clone, Copy derives to FinalizerEntry.
Add required changelog fragment for kubernetes_logs acknowledgement
feature. Fix end-to-end acknowledgements URL in CUE documentation
to match the pattern used by all other source docs.
connoryy and others added 9 commits April 28, 2026 00:29
…otdev#25259)

* chore(ci): remove dead nightly artifact-redirect loop

The loop referenced an undefined $i (copy-paste from the release branch
where $i iterates over version tags). Under set -u the subshell errored,
the for loop received empty input, and the body never ran — so this has
been a no-op since it was added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(ci): retry verify_artifact on CDN cache staleness

packages.timber.io is fronted by a CDN; after `aws s3 rm --recursive` +
`cp --recursive` on nightly/latest, the edge can keep serving stale
bytes for longer than the existing 30s VERIFY_TIMEOUT, failing `cmp`
and the whole job. wget's --retry-on-http-error=404 only retries 404s,
not a 200 with stale content.

Wrap the compare in an exponential-backoff retry loop
(1, 2, 4, 8, 16, 32s; 7 attempts, ~63s of total sleep) so a transient
stale-cache hit no longer fails the release.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Fetch and fast-forward the release branch before inspecting HEAD so
  `git show --stat HEAD` reflects origin, not a stale local tip.
- Use `git push --force-with-lease` when resetting the `website` branch
  to the release branch's HEAD; a plain `git push` is rejected as
  non-fast-forward, which is the expected outcome of `reset --hard` to
  a different branch.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…g errors (vectordotdev#25199)

* fix(codecs): centralize events_dropped emission for batch encoding errors

Move events_dropped emission from individual internal events inside
serializers to a single wrapper in (Transformer, BatchEncoder)::encode_input.
This ensures all batch encoding error paths (Arrow IPC and Parquet) consistently
emit events_dropped without requiring each new error path to remember to add it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: fix formatting

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test(codecs): add unit test for type mismatch in Parquet encoding

Covers the build_record_batch ArrowJsonDecode error path where a schema
expects int64 but the event contains a string value.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(codecs): emit ComponentEventsDropped directly to avoid double-counting

Replace EncoderWriteError with a direct ComponentEventsDropped emission
in the batch encode wrapper. EncoderWriteError was incrementing
component_errors_total and logging "Failed writing bytes." which
double-counted errors (codec-specific events already increment
component_errors_total) and was misleading (the failure was encoding,
not writing).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* docs: clarify comment about error counting and overcount edge case

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(codecs): guard batch-drop imports behind codecs-arrow feature

Move ComponentEventsDropped and UNINTENTIONAL imports inside the
cfg(feature = "codecs-arrow") impl block to avoid unused import
errors when the feature is disabled.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* docs: clarify overcount edge case is a misconfiguration scenario

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore(codecs): emit codec event for build_record_batch failures

The new EncoderRecordBatchError fires from build_record_batch's
RecordBatchCreation and ArrowJsonDecode paths, so type-mismatch and
decoder-build failures emit a granular component_errors_total counter at
stage="sending" with a specific error_code, instead of relying solely on
the downstream SinkRequestBuildError at stage="processing".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(codecs): add changelog fragment for batch encoding event coverage

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(codecs): assert EncoderRecordBatchError fires on Arrow type mismatch

Drives (Transformer, BatchEncoder)::encode_input through ArrowStreamSerializer
with an Int64 schema field and a string-valued event to trigger the
ArrowJsonDecode path in build_record_batch. Asserts both EncoderRecordBatchError
and ComponentEventsDropped are recorded so a regression in either emission
fails the test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ctordotdev#25321)

* fix(releasing): enable codecs-parquet in all release feature sets

The aws_s3 sink's `batch_encoding` field is gated behind the
`codecs-parquet` Cargo feature, but the v0.55.0 release artifacts did
not include that feature. Users running the precompiled binaries hit
`unknown field batch_encoding` even though the feature was advertised
in the v0.55.0 release notes.

Enable `codecs-parquet` in `target-base` (covers every Linux release
triple), `default` (macOS), and `default-msvc` (Windows), plus the
related aggregator features for consistency.

Closes vectordotdev#25313

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(releasing): correct changelog author handle

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(releasing): trim changelog wording

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(external docs): regenerate component docs with parquet codec

Adds the missing `docs::enum_tag_description` metadata on
`ParquetCompression` (required by the doc generator for internally
tagged enums) and regenerates the aws_s3 and clickhouse component
Cue files now that `codecs-parquet` and `codecs-arrow` are part of
the default release feature set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs: address review wording on batch encoding fields

Apply @maycmlee's suggestions in the source rustdocs (and regenerate
the affected Cue):

- aws_s3 `batch_encoding`: `e.g., Parquet` -> `for example, Parquet`
- arrow `allow_nullable_fields`: tighter wording for both enabled and
  disabled cases
- parquet `Zstd`/`Gzip` `level`: drop "currently" from "Vector
  currently supports"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(releasing): consolidate codecs-parquet into base feature

Every release feature aggregator (default, default-cmake, default-msvc,
default-musl, default-no-api-client, target-base) transitively pulls
`base` either directly or via `enable-api-client`/`enable-unix`. Drop
the redundant per-aggregator listings and add `codecs-parquet` to
`base` once so the feature can't be silently lost when a future
aggregator gets added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(aws_s3, clickhouse): note supported batch_encoding codec per sink

The `batch_encoding` field on both sinks types as `Option<BatchSerializerConfig>`,
which advertises every codec variant in the generated reference. Each sink
only accepts a subset at runtime: aws_s3 rejects everything except
`parquet`, clickhouse rejects `parquet`. Call out the supported codec in
the field's rustdoc so the published reference matches what the sink
will actually accept.

A follow-up should replace the shared enum with sink-specific config
types so the type system enforces the constraint.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When auto_partial_merge is enabled (the default), partial log lines are
merged into a single event. Previously, each raw line received its own
BatchNotifier before the merge step. When the merger dropped fragment
events after extracting their bytes, those fragments' finalizers fired
as Delivered, advancing the checkpoint past unacknowledged data. If the
merged event was later rejected by a sink, earlier fragments had already
been checkpointed.

Fix: transfer finalizers from each fragment to the bucket event during
merge, so the merged event carries all fragments' finalizers. The
checkpoint only advances after the complete merged event is delivered.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants