enhancement(topology): add per-sink authoritative acknowledgement control#39
Open
enhancement(topology): add per-sink authoritative acknowledgement control#39
Conversation
Add an `authoritative` field to `AcknowledgementsConfig` that controls whether a sink participates in the source's acknowledgement chain. In fan-out topologies, sources normally wait for ALL connected sinks before acknowledging events. Setting `authoritative: false` on a sink strips event finalizers at the topology layer, causing the source's BatchNotifier to resolve without waiting for that sink. The field defaults to the value of `enabled`, preserving full backwards compatibility. Only explicitly setting `authoritative = false` on an ack-enabled sink changes behavior. This implements the "authoritative sink" concept from the original end-to-end acknowledgement RFC (6517), addressing the long-standing issue where slow or failing non-critical sinks block acknowledgement of critical sinks.
Change authoritative default from enabled() to false, matching RFC 6517: "If no sink indicates it is authoritative, all sinks must finalize the event before an acknowledgement may be sent." Stripping only activates when at least one sink explicitly sets authoritative: true. When no sink has authoritative set, all sinks participate in the ack chain (full backwards compatibility). This means: - No authoritative set anywhere → current behavior, no stripping - authoritative: true on one sink → only that sink blocks source acks, all others have finalizers stripped - authoritative: true on multiple sinks → source waits for all of them
Previously, finalizer stripping for non-authoritative sinks happened in build_sinks() after the buffer, right before sink.run() processes events. This was too late: stateful transforms like aggregate hold event clones with finalizer references in internal state, preventing the source's BatchNotifier from resolving even after the authoritative sink finishes. This commit moves stripping to the Fanout level -- the point where events are distributed to downstream components. At topology build time, a BFS walks backward from authoritative sinks through the config graph to determine which components are on authoritative paths. Each Fanout sender is tagged with a strip_finalizers flag: true if the downstream component is NOT on an authoritative path. When a Fanout distributes events, copies destined for non-authoritative paths have their finalizers dropped in Sender::flush() before entering the downstream buffer. Changes: - src/config/mod.rs: Add compute_authoritative_components() BFS method - lib/vector-core/src/fanout.rs: Per-sender strip_finalizers flag, stripping in Sender::flush(), strip_flags map on Fanout for persistence across Replace ops, updated ControlMessage::Add to carry strip flag - src/topology/builder.rs: Compute authoritative set in Builder::build(), store on TopologyPieces, remove sink-level stripping from build_sinks() - src/topology/running.rs: authoritative_components field on RunningTopology, strip flag passed in setup_inputs() and reattach_severed_inputs() via ControlMessage::Add - lib/vector-tap/src/controller.rs: Updated for new ControlMessage::Add signature (tap sinks never strip)
B1: Fix doc comment on `authoritative` field to say "Defaults to `false` per RFC 6517" and explain backwards compatibility via `compute_authoritative_components()` returning `None`. Remove redundant `#[serde(default)]` attribute. B2: Introduce `SenderSlot` struct in fanout.rs that pairs `Option<Sender>` with `strip_finalizers: bool`, eliminating the parallel `strip_flags` HashMap. The strip flag now travels with the slot and survives pause/replace cycles without a separate lookup. S1: Convert `ControlMessage::Add` from tuple variant to named-field struct variant for clarity. Update all pattern matches in fanout.rs, running.rs, and controller.rs. S2: Add explanatory comment in `compute_authoritative_components()` BFS about how named outputs (routes) are handled. S6: Rewrite changelog to accurately state that `authoritative` defaults to `false`, the feature only activates when at least one sink sets `authoritative: true`, and there is zero behavioral change otherwise. N1: Fix import ordering in fanout.rs -- group `vector_common` with `vector_buffers` above the `crate::` import. N6: Add `BatchStatus::Delivered` assertion to `fanout_strip_flag_preserved_across_replace` test.
914372d to
ad83b95
Compare
…otdev#25259) * chore(ci): remove dead nightly artifact-redirect loop The loop referenced an undefined $i (copy-paste from the release branch where $i iterates over version tags). Under set -u the subshell errored, the for loop received empty input, and the body never ran — so this has been a no-op since it was added. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(ci): retry verify_artifact on CDN cache staleness packages.timber.io is fronted by a CDN; after `aws s3 rm --recursive` + `cp --recursive` on nightly/latest, the edge can keep serving stale bytes for longer than the existing 30s VERIFY_TIMEOUT, failing `cmp` and the whole job. wget's --retry-on-http-error=404 only retries 404s, not a 200 with stale content. Wrap the compare in an exponential-backoff retry loop (1, 2, 4, 8, 16, 32s; 7 attempts, ~63s of total sleep) so a transient stale-cache hit no longer fails the release. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Fetch and fast-forward the release branch before inspecting HEAD so `git show --stat HEAD` reflects origin, not a stale local tip. - Use `git push --force-with-lease` when resetting the `website` branch to the release branch's HEAD; a plain `git push` is rejected as non-fast-forward, which is the expected outcome of `reset --hard` to a different branch. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…g errors (vectordotdev#25199) * fix(codecs): centralize events_dropped emission for batch encoding errors Move events_dropped emission from individual internal events inside serializers to a single wrapper in (Transformer, BatchEncoder)::encode_input. This ensures all batch encoding error paths (Arrow IPC and Parquet) consistently emit events_dropped without requiring each new error path to remember to add it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: fix formatting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(codecs): add unit test for type mismatch in Parquet encoding Covers the build_record_batch ArrowJsonDecode error path where a schema expects int64 but the event contains a string value. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(codecs): emit ComponentEventsDropped directly to avoid double-counting Replace EncoderWriteError with a direct ComponentEventsDropped emission in the batch encode wrapper. EncoderWriteError was incrementing component_errors_total and logging "Failed writing bytes." which double-counted errors (codec-specific events already increment component_errors_total) and was misleading (the failure was encoding, not writing). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: clarify comment about error counting and overcount edge case Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(codecs): guard batch-drop imports behind codecs-arrow feature Move ComponentEventsDropped and UNINTENTIONAL imports inside the cfg(feature = "codecs-arrow") impl block to avoid unused import errors when the feature is disabled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * docs: clarify overcount edge case is a misconfiguration scenario Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore(codecs): emit codec event for build_record_batch failures The new EncoderRecordBatchError fires from build_record_batch's RecordBatchCreation and ArrowJsonDecode paths, so type-mismatch and decoder-build failures emit a granular component_errors_total counter at stage="sending" with a specific error_code, instead of relying solely on the downstream SinkRequestBuildError at stage="processing". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(codecs): add changelog fragment for batch encoding event coverage Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(codecs): assert EncoderRecordBatchError fires on Arrow type mismatch Drives (Transformer, BatchEncoder)::encode_input through ArrowStreamSerializer with an Int64 schema field and a string-valued event to trigger the ArrowJsonDecode path in build_record_batch. Asserts both EncoderRecordBatchError and ComponentEventsDropped are recorded so a regression in either emission fails the test. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ctordotdev#25321) * fix(releasing): enable codecs-parquet in all release feature sets The aws_s3 sink's `batch_encoding` field is gated behind the `codecs-parquet` Cargo feature, but the v0.55.0 release artifacts did not include that feature. Users running the precompiled binaries hit `unknown field batch_encoding` even though the feature was advertised in the v0.55.0 release notes. Enable `codecs-parquet` in `target-base` (covers every Linux release triple), `default` (macOS), and `default-msvc` (Windows), plus the related aggregator features for consistency. Closes vectordotdev#25313 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(releasing): correct changelog author handle Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(releasing): trim changelog wording Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(external docs): regenerate component docs with parquet codec Adds the missing `docs::enum_tag_description` metadata on `ParquetCompression` (required by the doc generator for internally tagged enums) and regenerates the aws_s3 and clickhouse component Cue files now that `codecs-parquet` and `codecs-arrow` are part of the default release feature set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: address review wording on batch encoding fields Apply @maycmlee's suggestions in the source rustdocs (and regenerate the affected Cue): - aws_s3 `batch_encoding`: `e.g., Parquet` -> `for example, Parquet` - arrow `allow_nullable_fields`: tighter wording for both enabled and disabled cases - parquet `Zstd`/`Gzip` `level`: drop "currently" from "Vector currently supports" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(releasing): consolidate codecs-parquet into base feature Every release feature aggregator (default, default-cmake, default-msvc, default-musl, default-no-api-client, target-base) transitively pulls `base` either directly or via `enable-api-client`/`enable-unix`. Drop the redundant per-aggregator listings and add `codecs-parquet` to `base` once so the feature can't be silently lost when a future aggregator gets added. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(aws_s3, clickhouse): note supported batch_encoding codec per sink The `batch_encoding` field on both sinks types as `Option<BatchSerializerConfig>`, which advertises every codec variant in the generated reference. Each sink only accepts a subset at runtime: aws_s3 rejects everything except `parquet`, clickhouse rejects `parquet`. Call out the supported codec in the field's rustdoc so the published reference matches what the sink will actually accept. A follow-up should replace the shared enum with sink-specific config types so the type system enforces the constraint. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pstream-authoritative-acks
…acks' into connor/upstream-authoritative-acks
A sink with authoritative: true but enabled: false should not activate the stripping feature. Without this check, such a sink would cause other ack-enabled sinks to have their finalizers stripped while the 'authoritative' sink doesn't actually participate in acks (because propagate_acks_rec only traces from enabled sinks).
…horitative sinks Use per-edge strip decisions instead of per-component: only strip when the upstream component IS in the authoritative set and the downstream is NOT. This preserves legacy wait-for-all behavior for sources whose downstream graph has no authoritative sinks, even when other pipelines in the same topology use authoritative mode. Also fixes tap fanout attachments: tap sinks now use per-edge stripping so they cannot block acknowledgements in authoritative mode.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements per-sink "authoritative" acknowledgement control, as proposed in RFC 6517 but never built.
When a source has end-to-end acknowledgements enabled, it currently waits for all connected sinks to finalize events before acknowledging — including sinks that don't need delivery guarantees. This PR adds an
authoritativefield to the sink acknowledgements config. When at least one sink setsauthoritative: true, only authoritative sinks participate in the ack chain. Non-authoritative sinks have their event finalizers stripped at the fan-out level, so the source'sBatchNotifierresolves as soon as all authoritative sinks have processed the events.When no sink sets
authoritative: true, behavior is identical to current Vector — zero breaking changes.Motivation
This addresses a long-standing design limitation where a slow, failing, or unreachable non-critical sink blocks acknowledgements for critical sinks:
vectordotdev#10983 and vectordotdev#11346 moved ack config from sources to sinks but explicitly did not change the wait-for-all runtime behavior. This PR completes that work.
Approach
Config
A single
Option<bool>field added toAcknowledgementsConfig:All 63 sink configs pick this up automatically via the shared
AcknowledgementsConfigstruct. No individual sink changes needed.Runtime: fan-out level stripping
Non-authoritative sinks have their
EventFinalizerreferences stripped at the fan-out point — where events are distributed to downstream components — rather than at the sink input stream. This is critical because stateful transforms (likeaggregate,reduce) between the fan-out and the sink can hold events for extended periods. Sink-level stripping would be too late — the transform would hold finalizer references, blocking theBatchNotifier.The stripping calls
events.take_finalizers()inSender::flush()for non-authoritative senders. The dropped finalizers callupdate_batch(EventStatus::Dropped), which is a no-op onBatchStatus.Design decision: stripping vs RFC's "immediate status update"
RFC 6517 proposed having authoritative sinks immediately set
EventStatus::Recordedon finalization. This PR takes a different approach: strip finalizers from non-authoritative paths entirely. The semantic guarantee is identical (source only waits for authoritative sinks), but stripping is simpler:Backwards compatibility
The
authoritativefield defaults tofalse. The feature activates only when at least one sink explicitly setsauthoritative: true. When inactive:compute_authoritative_components()returnsNoneif self.strip_finalizersbranch inSender::flush()is never takenThis is enforced by the
any_sink_explicitly_authoritativeguard in the topology builder and verified by two dedicated backwards-compatibility tests.Performance
When inactive (no authoritative sinks): zero per-event overhead. The
strip_finalizersbool check inSender::flush()is alwaysfalseand branch-predicted away. Thecompute_authoritative_components()BFS runs once at topology build time.When active: the stripping work (
take_finalizers()+Arc::make_mut+ ref count operations) is the same work that would happen at the sink regardless — it's just done earlier. Non-authoritative transforms and buffers no longer carry unnecessaryArc<EventFinalizer>references, slightly reducing memory pressure.Vector configuration
How did you test this PR?
28 tests across 4 categories:
Config defaults and merging (2 tests):
authoritativedefaults tofalsefor all combinations of enabled/authoritative/unsetauthoritativeoverrides global; global falls through when local is NoneGraph walk / BFS (9 tests):
Nonewhen no sink is explicitly authoritative (backwards compat)Nonefor empty configNoneFan-out stripping mechanics (2 new + 8 existing):
End-to-end topology (2 tests):
authoritative_sink_controls_ack_chain: source acks when auth sink drains, even though non-auth sink holds finalizers foreverauthoritative_sink_blocks_ack_when_not_drained: source does NOT ack when auth sink holds finalizers, even though non-auth sink drains normallyAll tests:
cargo checkclean,cargo test -p vector-core --lib fanout::tests(10 pass),cargo test -p vector-core config::test(7 pass),cargo test -p vector config::authoritative_tests(9 pass),cargo test -p vector topology::test::authoritative_acks(2 pass).Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changeloglabel to this PR.References