Skip to content

enhancement(topology): add per-sink authoritative acknowledgement control#39

Open
connoryy wants to merge 21 commits intomasterfrom
connor/upstream-authoritative-acks
Open

enhancement(topology): add per-sink authoritative acknowledgement control#39
connoryy wants to merge 21 commits intomasterfrom
connor/upstream-authoritative-acks

Conversation

@connoryy
Copy link
Copy Markdown
Owner

Summary

Implements per-sink "authoritative" acknowledgement control, as proposed in RFC 6517 but never built.

When a source has end-to-end acknowledgements enabled, it currently waits for all connected sinks to finalize events before acknowledging — including sinks that don't need delivery guarantees. This PR adds an authoritative field to the sink acknowledgements config. When at least one sink sets authoritative: true, only authoritative sinks participate in the ack chain. Non-authoritative sinks have their event finalizers stripped at the fan-out level, so the source's BatchNotifier resolves as soon as all authoritative sinks have processed the events.

When no sink sets authoritative: true, behavior is identical to current Vector — zero breaking changes.

Motivation

This addresses a long-standing design limitation where a slow, failing, or unreachable non-critical sink blocks acknowledgements for critical sinks:

vectordotdev#10983 and vectordotdev#11346 moved ack config from sources to sinks but explicitly did not change the wait-for-all runtime behavior. This PR completes that work.

Approach

Config

A single Option<bool> field added to AcknowledgementsConfig:

pub struct AcknowledgementsConfig {
    enabled: Option<bool>,
    authoritative: Option<bool>,  // NEW — defaults to false per RFC 6517
}

All 63 sink configs pick this up automatically via the shared AcknowledgementsConfig struct. No individual sink changes needed.

Runtime: fan-out level stripping

Non-authoritative sinks have their EventFinalizer references stripped at the fan-out point — where events are distributed to downstream components — rather than at the sink input stream. This is critical because stateful transforms (like aggregate, reduce) between the fan-out and the sink can hold events for extended periods. Sink-level stripping would be too late — the transform would hold finalizer references, blocking the BatchNotifier.

The stripping calls events.take_finalizers() in Sender::flush() for non-authoritative senders. The dropped finalizers call update_batch(EventStatus::Dropped), which is a no-op on BatchStatus.

Design decision: stripping vs RFC's "immediate status update"

RFC 6517 proposed having authoritative sinks immediately set EventStatus::Recorded on finalization. This PR takes a different approach: strip finalizers from non-authoritative paths entirely. The semantic guarantee is identical (source only waits for authoritative sinks), but stripping is simpler:

  • No changes to the finalization state machine
  • No new status variants
  • Works with any sink type without per-sink code changes
  • Naturally handles stateful transforms (finalizers never enter the non-authoritative path)

Backwards compatibility

The authoritative field defaults to false. The feature activates only when at least one sink explicitly sets authoritative: true. When inactive:

  • compute_authoritative_components() returns None
  • No strip flags are set on any fan-out sender
  • The if self.strip_finalizers branch in Sender::flush() is never taken
  • Behavior is identical to current Vector

This is enforced by the any_sink_explicitly_authoritative guard in the topology builder and verified by two dedicated backwards-compatibility tests.

Performance

When inactive (no authoritative sinks): zero per-event overhead. The strip_finalizers bool check in Sender::flush() is always false and branch-predicted away. The compute_authoritative_components() BFS runs once at topology build time.

When active: the stripping work (take_finalizers() + Arc::make_mut + ref count operations) is the same work that would happen at the sink regardless — it's just done earlier. Non-authoritative transforms and buffers no longer carry unnecessary Arc<EventFinalizer> references, slightly reducing memory pressure.

Vector configuration

sinks:
  s3_archive:
    type: aws_s3
    inputs: ["transform"]
    acknowledgements:
      enabled: true
      authoritative: true
      # Source waits for this sink before acknowledging

  console_debug:
    type: console
    inputs: ["transform"]
    acknowledgements:
      enabled: true
      # No authoritative field — defaults to false
      # Source does NOT wait for this sink

  loki_monitoring:
    type: loki
    inputs: ["transform"]
    # No acknowledgements block at all
    # Source does NOT wait for this sink

How did you test this PR?

28 tests across 4 categories:

Config defaults and merging (2 tests):

  • authoritative defaults to false for all combinations of enabled/authoritative/unset
  • Local authoritative overrides global; global falls through when local is None

Graph walk / BFS (9 tests):

  • Returns None when no sink is explicitly authoritative (backwards compat)
  • Returns None for empty config
  • Linear pipeline: all components on auth path included
  • Fan-out: shared upstream marked authoritative; dedicated non-auth branch excluded
  • Multiple authoritative sinks: both paths included
  • Chain of transforms: entire chain included
  • Diamond topology: all components included
  • Multi-component realistic config with no authoritative: None

Fan-out stripping mechanics (2 new + 8 existing):

  • Non-auth sender stripped, auth sender keeps finalizers, BatchNotifier resolves only from auth path
  • Strip flag survives pause → replace cycle
  • All 8 existing fanout tests pass unchanged

End-to-end topology (2 tests):

  • authoritative_sink_controls_ack_chain: source acks when auth sink drains, even though non-auth sink holds finalizers forever
  • authoritative_sink_blocks_ack_when_not_drained: source does NOT ack when auth sink holds finalizers, even though non-auth sink drains normally

All tests: cargo check clean, cargo test -p vector-core --lib fanout::tests (10 pass), cargo test -p vector-core config::test (7 pass), cargo test -p vector config::authoritative_tests (9 pass), cargo test -p vector topology::test::authoritative_acks (2 pass).

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Comment thread changelog.d/authoritative_sink_acks.enhancement.md Fixed
Add an `authoritative` field to `AcknowledgementsConfig` that controls
whether a sink participates in the source's acknowledgement chain. In
fan-out topologies, sources normally wait for ALL connected sinks before
acknowledging events. Setting `authoritative: false` on a sink strips
event finalizers at the topology layer, causing the source's BatchNotifier
to resolve without waiting for that sink.

The field defaults to the value of `enabled`, preserving full backwards
compatibility. Only explicitly setting `authoritative = false` on an
ack-enabled sink changes behavior.

This implements the "authoritative sink" concept from the original
end-to-end acknowledgement RFC (6517), addressing the long-standing
issue where slow or failing non-critical sinks block acknowledgement
of critical sinks.
Change authoritative default from enabled() to false, matching RFC 6517:
"If no sink indicates it is authoritative, all sinks must finalize the
event before an acknowledgement may be sent."

Stripping only activates when at least one sink explicitly sets
authoritative: true. When no sink has authoritative set, all sinks
participate in the ack chain (full backwards compatibility).

This means:
- No authoritative set anywhere → current behavior, no stripping
- authoritative: true on one sink → only that sink blocks source acks,
  all others have finalizers stripped
- authoritative: true on multiple sinks → source waits for all of them
Previously, finalizer stripping for non-authoritative sinks happened in
build_sinks() after the buffer, right before sink.run() processes events.
This was too late: stateful transforms like aggregate hold event clones
with finalizer references in internal state, preventing the source's
BatchNotifier from resolving even after the authoritative sink finishes.

This commit moves stripping to the Fanout level -- the point where events
are distributed to downstream components. At topology build time, a BFS
walks backward from authoritative sinks through the config graph to
determine which components are on authoritative paths. Each Fanout sender
is tagged with a strip_finalizers flag: true if the downstream component
is NOT on an authoritative path. When a Fanout distributes events, copies
destined for non-authoritative paths have their finalizers dropped in
Sender::flush() before entering the downstream buffer.

Changes:
- src/config/mod.rs: Add compute_authoritative_components() BFS method
- lib/vector-core/src/fanout.rs: Per-sender strip_finalizers flag,
  stripping in Sender::flush(), strip_flags map on Fanout for persistence
  across Replace ops, updated ControlMessage::Add to carry strip flag
- src/topology/builder.rs: Compute authoritative set in Builder::build(),
  store on TopologyPieces, remove sink-level stripping from build_sinks()
- src/topology/running.rs: authoritative_components field on
  RunningTopology, strip flag passed in setup_inputs() and
  reattach_severed_inputs() via ControlMessage::Add
- lib/vector-tap/src/controller.rs: Updated for new ControlMessage::Add
  signature (tap sinks never strip)
B1: Fix doc comment on `authoritative` field to say "Defaults to `false`
per RFC 6517" and explain backwards compatibility via
`compute_authoritative_components()` returning `None`. Remove redundant
`#[serde(default)]` attribute.

B2: Introduce `SenderSlot` struct in fanout.rs that pairs `Option<Sender>`
with `strip_finalizers: bool`, eliminating the parallel `strip_flags`
HashMap. The strip flag now travels with the slot and survives
pause/replace cycles without a separate lookup.

S1: Convert `ControlMessage::Add` from tuple variant to named-field
struct variant for clarity. Update all pattern matches in fanout.rs,
running.rs, and controller.rs.

S2: Add explanatory comment in `compute_authoritative_components()` BFS
about how named outputs (routes) are handled.

S6: Rewrite changelog to accurately state that `authoritative` defaults
to `false`, the feature only activates when at least one sink sets
`authoritative: true`, and there is zero behavioral change otherwise.

N1: Fix import ordering in fanout.rs -- group `vector_common` with
`vector_buffers` above the `crate::` import.

N6: Add `BatchStatus::Delivered` assertion to
`fanout_strip_flag_preserved_across_replace` test.
connoryy and others added 9 commits April 28, 2026 00:28
…otdev#25259)

* chore(ci): remove dead nightly artifact-redirect loop

The loop referenced an undefined $i (copy-paste from the release branch
where $i iterates over version tags). Under set -u the subshell errored,
the for loop received empty input, and the body never ran — so this has
been a no-op since it was added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(ci): retry verify_artifact on CDN cache staleness

packages.timber.io is fronted by a CDN; after `aws s3 rm --recursive` +
`cp --recursive` on nightly/latest, the edge can keep serving stale
bytes for longer than the existing 30s VERIFY_TIMEOUT, failing `cmp`
and the whole job. wget's --retry-on-http-error=404 only retries 404s,
not a 200 with stale content.

Wrap the compare in an exponential-backoff retry loop
(1, 2, 4, 8, 16, 32s; 7 attempts, ~63s of total sleep) so a transient
stale-cache hit no longer fails the release.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Fetch and fast-forward the release branch before inspecting HEAD so
  `git show --stat HEAD` reflects origin, not a stale local tip.
- Use `git push --force-with-lease` when resetting the `website` branch
  to the release branch's HEAD; a plain `git push` is rejected as
  non-fast-forward, which is the expected outcome of `reset --hard` to
  a different branch.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…g errors (vectordotdev#25199)

* fix(codecs): centralize events_dropped emission for batch encoding errors

Move events_dropped emission from individual internal events inside
serializers to a single wrapper in (Transformer, BatchEncoder)::encode_input.
This ensures all batch encoding error paths (Arrow IPC and Parquet) consistently
emit events_dropped without requiring each new error path to remember to add it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: fix formatting

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test(codecs): add unit test for type mismatch in Parquet encoding

Covers the build_record_batch ArrowJsonDecode error path where a schema
expects int64 but the event contains a string value.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(codecs): emit ComponentEventsDropped directly to avoid double-counting

Replace EncoderWriteError with a direct ComponentEventsDropped emission
in the batch encode wrapper. EncoderWriteError was incrementing
component_errors_total and logging "Failed writing bytes." which
double-counted errors (codec-specific events already increment
component_errors_total) and was misleading (the failure was encoding,
not writing).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* docs: clarify comment about error counting and overcount edge case

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(codecs): guard batch-drop imports behind codecs-arrow feature

Move ComponentEventsDropped and UNINTENTIONAL imports inside the
cfg(feature = "codecs-arrow") impl block to avoid unused import
errors when the feature is disabled.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* docs: clarify overcount edge case is a misconfiguration scenario

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore(codecs): emit codec event for build_record_batch failures

The new EncoderRecordBatchError fires from build_record_batch's
RecordBatchCreation and ArrowJsonDecode paths, so type-mismatch and
decoder-build failures emit a granular component_errors_total counter at
stage="sending" with a specific error_code, instead of relying solely on
the downstream SinkRequestBuildError at stage="processing".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(codecs): add changelog fragment for batch encoding event coverage

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(codecs): assert EncoderRecordBatchError fires on Arrow type mismatch

Drives (Transformer, BatchEncoder)::encode_input through ArrowStreamSerializer
with an Int64 schema field and a string-valued event to trigger the
ArrowJsonDecode path in build_record_batch. Asserts both EncoderRecordBatchError
and ComponentEventsDropped are recorded so a regression in either emission
fails the test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ctordotdev#25321)

* fix(releasing): enable codecs-parquet in all release feature sets

The aws_s3 sink's `batch_encoding` field is gated behind the
`codecs-parquet` Cargo feature, but the v0.55.0 release artifacts did
not include that feature. Users running the precompiled binaries hit
`unknown field batch_encoding` even though the feature was advertised
in the v0.55.0 release notes.

Enable `codecs-parquet` in `target-base` (covers every Linux release
triple), `default` (macOS), and `default-msvc` (Windows), plus the
related aggregator features for consistency.

Closes vectordotdev#25313

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(releasing): correct changelog author handle

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(releasing): trim changelog wording

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore(external docs): regenerate component docs with parquet codec

Adds the missing `docs::enum_tag_description` metadata on
`ParquetCompression` (required by the doc generator for internally
tagged enums) and regenerates the aws_s3 and clickhouse component
Cue files now that `codecs-parquet` and `codecs-arrow` are part of
the default release feature set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs: address review wording on batch encoding fields

Apply @maycmlee's suggestions in the source rustdocs (and regenerate
the affected Cue):

- aws_s3 `batch_encoding`: `e.g., Parquet` -> `for example, Parquet`
- arrow `allow_nullable_fields`: tighter wording for both enabled and
  disabled cases
- parquet `Zstd`/`Gzip` `level`: drop "currently" from "Vector
  currently supports"

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(releasing): consolidate codecs-parquet into base feature

Every release feature aggregator (default, default-cmake, default-msvc,
default-musl, default-no-api-client, target-base) transitively pulls
`base` either directly or via `enable-api-client`/`enable-unix`. Drop
the redundant per-aggregator listings and add `codecs-parquet` to
`base` once so the feature can't be silently lost when a future
aggregator gets added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(aws_s3, clickhouse): note supported batch_encoding codec per sink

The `batch_encoding` field on both sinks types as `Option<BatchSerializerConfig>`,
which advertises every codec variant in the generated reference. Each sink
only accepts a subset at runtime: aws_s3 rejects everything except
`parquet`, clickhouse rejects `parquet`. Call out the supported codec in
the field's rustdoc so the published reference matches what the sink
will actually accept.

A follow-up should replace the shared enum with sink-specific config
types so the type system enforces the constraint.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…acks' into connor/upstream-authoritative-acks
A sink with authoritative: true but enabled: false should not activate
the stripping feature. Without this check, such a sink would cause
other ack-enabled sinks to have their finalizers stripped while the
'authoritative' sink doesn't actually participate in acks (because
propagate_acks_rec only traces from enabled sinks).
…horitative sinks

Use per-edge strip decisions instead of per-component: only strip when
the upstream component IS in the authoritative set and the downstream
is NOT. This preserves legacy wait-for-all behavior for sources whose
downstream graph has no authoritative sinks, even when other pipelines
in the same topology use authoritative mode.

Also fixes tap fanout attachments: tap sinks now use per-edge stripping
so they cannot block acknowledgements in authoritative mode.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Sources should only wait to ack for sinks with acks enabled

4 participants