enhancement(topology): add per-sink authoritative acknowledgement control#25326
enhancement(topology): add per-sink authoritative
acknowledgement control#25326connoryy wants to merge 14 commits intovectordotdev:masterfrom
Conversation
Add an `authoritative` field to `AcknowledgementsConfig` that controls whether a sink participates in the source's acknowledgement chain. In fan-out topologies, sources normally wait for ALL connected sinks before acknowledging events. Setting `authoritative: false` on a sink strips event finalizers at the topology layer, causing the source's BatchNotifier to resolve without waiting for that sink. The field defaults to the value of `enabled`, preserving full backwards compatibility. Only explicitly setting `authoritative = false` on an ack-enabled sink changes behavior. This implements the "authoritative sink" concept from the original end-to-end acknowledgement RFC (6517), addressing the long-standing issue where slow or failing non-critical sinks block acknowledgement of critical sinks.
Change authoritative default from enabled() to false, matching RFC 6517: "If no sink indicates it is authoritative, all sinks must finalize the event before an acknowledgement may be sent." Stripping only activates when at least one sink explicitly sets authoritative: true. When no sink has authoritative set, all sinks participate in the ack chain (full backwards compatibility). This means: - No authoritative set anywhere → current behavior, no stripping - authoritative: true on one sink → only that sink blocks source acks, all others have finalizers stripped - authoritative: true on multiple sinks → source waits for all of them
Previously, finalizer stripping for non-authoritative sinks happened in build_sinks() after the buffer, right before sink.run() processes events. This was too late: stateful transforms like aggregate hold event clones with finalizer references in internal state, preventing the source's BatchNotifier from resolving even after the authoritative sink finishes. This commit moves stripping to the Fanout level -- the point where events are distributed to downstream components. At topology build time, a BFS walks backward from authoritative sinks through the config graph to determine which components are on authoritative paths. Each Fanout sender is tagged with a strip_finalizers flag: true if the downstream component is NOT on an authoritative path. When a Fanout distributes events, copies destined for non-authoritative paths have their finalizers dropped in Sender::flush() before entering the downstream buffer. Changes: - src/config/mod.rs: Add compute_authoritative_components() BFS method - lib/vector-core/src/fanout.rs: Per-sender strip_finalizers flag, stripping in Sender::flush(), strip_flags map on Fanout for persistence across Replace ops, updated ControlMessage::Add to carry strip flag - src/topology/builder.rs: Compute authoritative set in Builder::build(), store on TopologyPieces, remove sink-level stripping from build_sinks() - src/topology/running.rs: authoritative_components field on RunningTopology, strip flag passed in setup_inputs() and reattach_severed_inputs() via ControlMessage::Add - lib/vector-tap/src/controller.rs: Updated for new ControlMessage::Add signature (tap sinks never strip)
B1: Fix doc comment on `authoritative` field to say "Defaults to `false` per RFC 6517" and explain backwards compatibility via `compute_authoritative_components()` returning `None`. Remove redundant `#[serde(default)]` attribute. B2: Introduce `SenderSlot` struct in fanout.rs that pairs `Option<Sender>` with `strip_finalizers: bool`, eliminating the parallel `strip_flags` HashMap. The strip flag now travels with the slot and survives pause/replace cycles without a separate lookup. S1: Convert `ControlMessage::Add` from tuple variant to named-field struct variant for clarity. Update all pattern matches in fanout.rs, running.rs, and controller.rs. S2: Add explanatory comment in `compute_authoritative_components()` BFS about how named outputs (routes) are handled. S6: Rewrite changelog to accurately state that `authoritative` defaults to `false`, the feature only activates when at least one sink sets `authoritative: true`, and there is zero behavioral change otherwise. N1: Fix import ordering in fanout.rs -- group `vector_common` with `vector_buffers` above the `crate::` import. N6: Add `BatchStatus::Delivered` assertion to `fanout_strip_flag_preserved_across_replace` test.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c11c9dd62e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // sequence of control operations has been applied. | ||
| assert!( | ||
| sender.replace(Sender::new(sink)).is_none(), | ||
| slot.sender.replace(Sender::new(sink, strip)).is_none(), |
There was a problem hiding this comment.
Recompute strip flag when replacing fanout senders
replace always reuses the previously stored slot.strip_finalizers, so a reload that changes acknowledgement authority does not update existing fanout links that go through pause/replace (the normal path for unchanged wiring in RunningTopology::setup_inputs). In practice, toggling authoritative (or introducing the first authoritative sink) via hot reload can leave old strip behavior in place until a full restart, causing stale ack semantics where non-authoritative sinks may still block or newly authoritative paths may still be stripped.
Useful? React with 👍 / 👎.
…pstream-authoritative-acks
…acks' into connor/upstream-authoritative-acks
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 24e9db5991
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| .filter(|(_, sink)| { | ||
| sink.inner | ||
| .acknowledgements() | ||
| .merge_default(&self.global.acknowledgements) | ||
| .authoritative() |
There was a problem hiding this comment.
Require acknowledgements.enabled for authoritative sinks
compute_authoritative_components marks sinks authoritative using only authoritative(), but source ack propagation is still gated by enabled() in propagate_acknowledgements (src/config/mod.rs:214-223). This lets a sink with acknowledgements.enabled = false and authoritative = true become authoritative, which can strip finalizers from sinks that are actually ack-enabled and make sources wait on (or prioritize) a sink that was explicitly opted out of end-to-end acknowledgements.
Useful? React with 👍 / 👎.
| let strip_finalizers = self | ||
| .authoritative_components | ||
| .as_ref() | ||
| .map(|auth| !auth.contains(key)) | ||
| .unwrap_or(false); |
There was a problem hiding this comment.
Preserve legacy acking for pipelines without auth sinks
This strip decision is global once authoritative_components is Some: every component not in that set has finalizers removed. If one authoritative sink exists elsewhere in the topology, a source whose downstream graph contains only non-authoritative sinks will still have all of its branches stripped and can acknowledge events before any of its connected sinks finalize them, instead of retaining the previous wait-for-all behavior for that source path.
Useful? React with 👍 / 👎.
A sink with authoritative: true but enabled: false should not activate the stripping feature. Without this check, such a sink would cause other ack-enabled sinks to have their finalizers stripped while the 'authoritative' sink doesn't actually participate in acks (because propagate_acks_rec only traces from enabled sinks).
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f609fead1d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| .send(fanout::ControlMessage::Add { | ||
| id: ComponentKey::from(sink_id.as_str()), | ||
| sender: tap_buffer_tx, | ||
| strip_finalizers: false, | ||
| }) |
There was a problem hiding this comment.
Strip finalizers for tap fanout attachments
When authoritative acknowledgements are enabled, tap sinks are still attached with strip_finalizers: false, so tapped events keep finalizers even though taps are not authoritative sinks. In a topology where sources wait on acknowledgements, a slow or stalled vector tap consumer can therefore hold batch finalizers and delay/override the intended "only authoritative sinks block ack" behavior. This affects runs with active tap subscriptions and can make acknowledgement latency depend on observability tooling rather than configured authoritative sinks.
Useful? React with 👍 / 👎.
…horitative sinks Use per-edge strip decisions instead of per-component: only strip when the upstream component IS in the authoritative set and the downstream is NOT. This preserves legacy wait-for-all behavior for sources whose downstream graph has no authoritative sinks, even when other pipelines in the same topology use authoritative mode. Also fixes tap fanout attachments: tap sinks now use per-edge stripping so they cannot block acknowledgements in authoritative mode.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6257497c8d
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| pub fn merge_default(&self, other: &Self) -> Self { | ||
| let enabled = self.enabled.or(other.enabled); | ||
| Self { enabled } | ||
| let authoritative = self.authoritative.or(other.authoritative); |
There was a problem hiding this comment.
Detect conflicting authoritative ack defaults during merge
Merging authoritative with self.authoritative.or(other.authoritative) silently picks one value when multiple config files set different global.acknowledgements.authoritative values. Unlike enabled, there is no corresponding conflict check in GlobalOptions::merge, so contradictory defaults can be accepted and change runtime ack-chain behavior based on merge order instead of failing fast. This is user-visible in multi-file deployments that rely on deterministic global ack semantics.
Useful? React with 👍 / 👎.
Summary
Implements per-sink "authoritative" acknowledgement control, as proposed in RFC 6517 but never built.
When a source has end-to-end acknowledgements enabled, it currently waits for all connected sinks to finalize events before acknowledging — including sinks that don't need delivery guarantees. This PR adds an
authoritativefield to the sink acknowledgements config. When at least one sink setsauthoritative: true, only authoritative sinks participate in the ack chain. Non-authoritative sinks have their event finalizers stripped at the fan-out level, so the source'sBatchNotifierresolves as soon as all authoritative sinks have processed the events.When no sink sets
authoritative: true, behavior is identical to current Vector — zero breaking changes.Motivation
This addresses a long-standing design limitation where a slow, failing, or unreachable non-critical sink blocks acknowledgements for critical sinks:
#10983 and #11346 moved ack config from sources to sinks but explicitly did not change the wait-for-all runtime behavior. This PR completes that work.
Approach
Config
A single
Option<bool>field added toAcknowledgementsConfig:All 63 sink configs pick this up automatically via the shared
AcknowledgementsConfigstruct. No individual sink changes needed.Runtime: fan-out level stripping
Non-authoritative sinks have their
EventFinalizerreferences stripped at the fan-out point — where events are distributed to downstream components — rather than at the sink input stream. This is critical because stateful transforms (likeaggregate,reduce) between the fan-out and the sink can hold events for extended periods. Sink-level stripping would be too late — the transform would hold finalizer references, blocking theBatchNotifier.The stripping calls
events.take_finalizers()inSender::flush()for non-authoritative senders. The dropped finalizers callupdate_batch(EventStatus::Dropped), which is a no-op onBatchStatus.Design decision: stripping vs RFC's "immediate status update"
RFC 6517 proposed having authoritative sinks immediately set
EventStatus::Recordedon finalization. This PR takes a different approach: strip finalizers from non-authoritative paths entirely. The semantic guarantee is identical (source only waits for authoritative sinks), but stripping is simpler:Backwards compatibility
The
authoritativefield defaults tofalse. The feature activates only when at least one sink explicitly setsauthoritative: true. When inactive:compute_authoritative_components()returnsNoneif self.strip_finalizersbranch inSender::flush()is never takenThis is enforced by the
any_sink_explicitly_authoritativeguard in the topology builder and verified by two dedicated backwards-compatibility tests.Performance
When inactive (no authoritative sinks): zero per-event overhead. The
strip_finalizersbool check inSender::flush()is alwaysfalseand branch-predicted away. Thecompute_authoritative_components()BFS runs once at topology build time.When active: the stripping work (
take_finalizers()+Arc::make_mut+ ref count operations) is the same work that would happen at the sink regardless — it's just done earlier. Non-authoritative transforms and buffers no longer carry unnecessaryArc<EventFinalizer>references, slightly reducing memory pressure.Vector configuration
How did you test this PR?
28 tests across 4 categories:
Config defaults and merging (2 tests):
authoritativedefaults tofalsefor all combinations of enabled/authoritative/unsetauthoritativeoverrides global; global falls through when local is NoneGraph walk / BFS (9 tests):
Nonewhen no sink is explicitly authoritative (backwards compat)Nonefor empty configNoneFan-out stripping mechanics (2 new + 8 existing):
End-to-end topology (2 tests):
authoritative_sink_controls_ack_chain: source acks when auth sink drains, even though non-auth sink holds finalizers foreverauthoritative_sink_blocks_ack_when_not_drained: source does NOT ack when auth sink holds finalizers, even though non-auth sink drains normallyAll tests:
cargo checkclean,cargo test -p vector-core --lib fanout::tests(10 pass),cargo test -p vector-core config::test(7 pass),cargo test -p vector config::authoritative_tests(9 pass),cargo test -p vector topology::test::authoritative_acks(2 pass).Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References